Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions accounting/aggregations/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@


OSDF_DIRECTOR_SERVER_URL = "https://osdf-director.osg-htc.org/api/v1.0/director_ui/servers"
OSDF_REGISTRY_SERVER_URL = "https://osdf-registry.osg-htc.org/api/v1.0/registry_ui/servers"
UWDF_DIRECTOR_SERVER_URL = "https://uwdf-director.chtc.wisc.edu/api/v1.0/director_ui/servers"
Comment on lines 30 to 32
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be future work, but maybe we should be getting these from the Metadata Discovery endpoint(s), since it's intended to move around less than a particular Central service address. Also we may be able to fallback to redundant / high-availability Directors in the future?

INSTITUTION_DATABASE_URL = "https://topology-institutions.osg-htc.org/api/institution_ids"
TOPOLOGY_PROJECT_DATA_URL = "https://topology.opensciencegrid.org/miscproject/xml"
Expand Down Expand Up @@ -220,6 +221,75 @@ def get_institution_database(cache_file=Path("./institution_database.pickle")) -
return institution_db


def get_osdf_endpoint_data(cache_file=Path("./osdf_endpoint_data.pickle")) -> dict:
"""Return a dict mapping OSDF endpoint (host:port) to a data dict.

Joins director data with registry data via registryPrefix -> registration[].prefix,
then enriches with institution name and state from the institution database.
"""
endpoint_map = {}

# Use cache if less than 20 minutes old
if cache_file.exists():
try:
endpoint_map = pickle.load(cache_file.open("rb"))
except Exception:
pass
if len(endpoint_map) > 0 and cache_file.stat().st_mtime > time.time() - 1200:
return endpoint_map

director_servers = get_osdf_director_servers()
institution_db = get_institution_database()
topology_resources = get_topology_resource_data()

registry_prefix_to_institution_id = {}
tries = 0
max_tries = 5
while tries < max_tries:
try:
with urlopen(OSDF_REGISTRY_SERVER_URL) as f:
for entry in json.load(f):
for reg in entry.get("registration", []):
prefix = reg.get("prefix", "")
institution_id = reg.get("admin_metadata", {}).get("institution")
if prefix and institution_id:
registry_prefix_to_institution_id[prefix] = institution_id
except HTTPError:
time.sleep(2**tries)
tries += 1
if tries == max_tries:
raise
else:
break

for url, server in director_servers.items():
registry_prefix = server.get("registryPrefix", "")
institution_id = registry_prefix_to_institution_id.get(registry_prefix)

# Fall back to topology resource data keyed by server name
if not institution_id:
server_name = server.get("name", "")
institution_id = topology_resources.get(server_name.lower(), {}).get("institution_id")

inst = institution_db.get(institution_id, {})
lat = server.get("latitude")
lon = server.get("longitude")
netloc = url.split("//")[-1]
endpoint_map[netloc] = {
"name": server.get("name") or None,
"id": server.get("serverId") or None,
"type": server.get("type") or None,
"institution": inst.get("name"),
"institution_id": institution_id,
"latitude": float(lat) if lat is not None else None,
"longitude": float(lon) if lon is not None else None,
"state": inst.get("state"),
}

pickle.dump(endpoint_map, cache_file.open("wb"))
return endpoint_map


def get_topology_project_data(cache_file=Path("./topology_project_data.pickle")) -> dict:
projects_data = {}

Expand Down
23 changes: 8 additions & 15 deletions accounting/aggregations/osdf_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from datetime import datetime, timedelta
from pathlib import Path

from functions import send_email, get_osdf_director_servers, get_topology_resource_data
from functions import send_email, get_topology_resource_data, get_osdf_endpoint_data

import elasticsearch
from elasticsearch_dsl import Search, A, Q
Expand Down Expand Up @@ -59,8 +59,8 @@
emit(job_id.hashCode());
"""

OSDF_DIRECTOR_SERVERS = {}
TOPOLOGY_RESOURCE_DATA = {}
OSDF_ENDPOINT_DATA = {}


def valid_date(date_str: str) -> datetime:
Expand Down Expand Up @@ -216,7 +216,7 @@ def get_endpoint_types(
endpoints = {bucket["key"]: bucket for bucket in result.aggregations.endpoint.buckets}
endpoint_types = {"cache": set(), "origin": set()}
for endpoint, bucket in endpoints.items():
endpoint_type = OSDF_DIRECTOR_SERVERS.get(f"https://{endpoint}", {"type": ""}).get("type", "")
endpoint_type = OSDF_ENDPOINT_DATA.get(endpoint, {"type": ""}).get("type", "")
if (
endpoint_type.lower() == "origin" or
"origin" in endpoint.split(".")[0] or
Expand Down Expand Up @@ -367,8 +367,8 @@ def sum_buckets_matching(buckets: dict, pattern: str) -> int:
args.end = args.start + timedelta(days=1)
days = (args.end - args.start).days

OSDF_DIRECTOR_SERVERS = get_osdf_director_servers(cache_file=args.cache_dir / "osdf_director_servers.pickle")
TOPOLOGY_RESOURCE_DATA = get_topology_resource_data(cache_file=args.cache_dir / "topology_resource_data.pickle")
OSDF_ENDPOINT_DATA = get_osdf_endpoint_data(cache_file=args.cache_dir / "osdf_endpoint_data.pickle")

es_args["timeout"] = es_args.pop("es_timeout", None)
if not es_args["timeout"]:
Expand Down Expand Up @@ -549,21 +549,14 @@ def sum_buckets_matching(buckets: dict, pattern: str) -> int:
endpoint_data = {"download": [], "upload": []}
for transfer_type, transfer_type_data in all_transfer_type_data.items():
for endpoint in transfer_type_data["endpoint"]:
server_info = OSDF_DIRECTOR_SERVERS.get(f"https://{endpoint}")
endpoint_institution = ""
if server_info:
endpoint_name = server_info.get("name")
if endpoint_name:
endpoint_institution = TOPOLOGY_RESOURCE_DATA.get(endpoint_name.lower(), {"institution": f"Unmapped endpoint {endpoint_name}"})["institution"]
else:
endpoint_name = "Unnamed endpoint"
else:
endpoint_name = "Not currently found*"
server_info = OSDF_ENDPOINT_DATA.get(endpoint, {})
endpoint_name = server_info.get("name", f"{endpoint} not found at director") or f"{endpoint} Not found at director"
endpoint_institution = server_info.get("institution", "Not found at registry") or "Not found at registry"
row = {
"endpoint": endpoint,
"endpoint_institution": endpoint_institution,
"endpoint_name": endpoint_name,
"endpoint_type": OSDF_DIRECTOR_SERVERS.get(f"https://{endpoint}", {"type": ""}).get("type", "") or "Cache*",
"endpoint_type": server_info.get("type") or "Cache*",
}
for attempt_type, attempt_data in {
"total_attempts": all_transfer_type_data,
Expand Down