From 54793e7cc134828084d2128aadba3bc9092ef5db Mon Sep 17 00:00:00 2001 From: Jason Patton Date: Fri, 17 Apr 2026 13:17:00 -0500 Subject: [PATCH] INF-3424 Use registry data where possible for OSDF reporting --- accounting/aggregations/functions.py | 70 ++++++++++++++++++++++++++ accounting/aggregations/osdf_report.py | 23 +++------ 2 files changed, 78 insertions(+), 15 deletions(-) diff --git a/accounting/aggregations/functions.py b/accounting/aggregations/functions.py index 0afd200..d7e2d8f 100644 --- a/accounting/aggregations/functions.py +++ b/accounting/aggregations/functions.py @@ -28,6 +28,7 @@ OSDF_DIRECTOR_SERVER_URL = "https://osdf-director.osg-htc.org/api/v1.0/director_ui/servers" +OSDF_REGISTRY_SERVER_URL = "https://osdf-registry.osg-htc.org/api/v1.0/registry_ui/servers" UWDF_DIRECTOR_SERVER_URL = "https://uwdf-director.chtc.wisc.edu/api/v1.0/director_ui/servers" INSTITUTION_DATABASE_URL = "https://topology-institutions.osg-htc.org/api/institution_ids" TOPOLOGY_PROJECT_DATA_URL = "https://topology.opensciencegrid.org/miscproject/xml" @@ -220,6 +221,75 @@ def get_institution_database(cache_file=Path("./institution_database.pickle")) - return institution_db +def get_osdf_endpoint_data(cache_file=Path("./osdf_endpoint_data.pickle")) -> dict: + """Return a dict mapping OSDF endpoint (host:port) to a data dict. + + Joins director data with registry data via registryPrefix -> registration[].prefix, + then enriches with institution name and state from the institution database. + """ + endpoint_map = {} + + # Use cache if less than 20 minutes old + if cache_file.exists(): + try: + endpoint_map = pickle.load(cache_file.open("rb")) + except Exception: + pass + if len(endpoint_map) > 0 and cache_file.stat().st_mtime > time.time() - 1200: + return endpoint_map + + director_servers = get_osdf_director_servers() + institution_db = get_institution_database() + topology_resources = get_topology_resource_data() + + registry_prefix_to_institution_id = {} + tries = 0 + max_tries = 5 + while tries < max_tries: + try: + with urlopen(OSDF_REGISTRY_SERVER_URL) as f: + for entry in json.load(f): + for reg in entry.get("registration", []): + prefix = reg.get("prefix", "") + institution_id = reg.get("admin_metadata", {}).get("institution") + if prefix and institution_id: + registry_prefix_to_institution_id[prefix] = institution_id + except HTTPError: + time.sleep(2**tries) + tries += 1 + if tries == max_tries: + raise + else: + break + + for url, server in director_servers.items(): + registry_prefix = server.get("registryPrefix", "") + institution_id = registry_prefix_to_institution_id.get(registry_prefix) + + # Fall back to topology resource data keyed by server name + if not institution_id: + server_name = server.get("name", "") + institution_id = topology_resources.get(server_name.lower(), {}).get("institution_id") + + inst = institution_db.get(institution_id, {}) + lat = server.get("latitude") + lon = server.get("longitude") + netloc = url.split("//")[-1] + endpoint_map[netloc] = { + "name": server.get("name") or None, + "id": server.get("serverId") or None, + "type": server.get("type") or None, + "institution": inst.get("name"), + "institution_id": institution_id, + "latitude": float(lat) if lat is not None else None, + "longitude": float(lon) if lon is not None else None, + "state": inst.get("state"), + } + + pickle.dump(endpoint_map, cache_file.open("wb")) + return endpoint_map + + def get_topology_project_data(cache_file=Path("./topology_project_data.pickle")) -> dict: projects_data = {} diff --git a/accounting/aggregations/osdf_report.py b/accounting/aggregations/osdf_report.py index 6bcc971..816c229 100644 --- a/accounting/aggregations/osdf_report.py +++ b/accounting/aggregations/osdf_report.py @@ -9,7 +9,7 @@ from datetime import datetime, timedelta from pathlib import Path -from functions import send_email, get_osdf_director_servers, get_topology_resource_data +from functions import send_email, get_topology_resource_data, get_osdf_endpoint_data import elasticsearch from elasticsearch_dsl import Search, A, Q @@ -59,8 +59,8 @@ emit(job_id.hashCode()); """ -OSDF_DIRECTOR_SERVERS = {} TOPOLOGY_RESOURCE_DATA = {} +OSDF_ENDPOINT_DATA = {} def valid_date(date_str: str) -> datetime: @@ -216,7 +216,7 @@ def get_endpoint_types( endpoints = {bucket["key"]: bucket for bucket in result.aggregations.endpoint.buckets} endpoint_types = {"cache": set(), "origin": set()} for endpoint, bucket in endpoints.items(): - endpoint_type = OSDF_DIRECTOR_SERVERS.get(f"https://{endpoint}", {"type": ""}).get("type", "") + endpoint_type = OSDF_ENDPOINT_DATA.get(endpoint, {"type": ""}).get("type", "") if ( endpoint_type.lower() == "origin" or "origin" in endpoint.split(".")[0] or @@ -367,8 +367,8 @@ def sum_buckets_matching(buckets: dict, pattern: str) -> int: args.end = args.start + timedelta(days=1) days = (args.end - args.start).days - OSDF_DIRECTOR_SERVERS = get_osdf_director_servers(cache_file=args.cache_dir / "osdf_director_servers.pickle") TOPOLOGY_RESOURCE_DATA = get_topology_resource_data(cache_file=args.cache_dir / "topology_resource_data.pickle") + OSDF_ENDPOINT_DATA = get_osdf_endpoint_data(cache_file=args.cache_dir / "osdf_endpoint_data.pickle") es_args["timeout"] = es_args.pop("es_timeout", None) if not es_args["timeout"]: @@ -549,21 +549,14 @@ def sum_buckets_matching(buckets: dict, pattern: str) -> int: endpoint_data = {"download": [], "upload": []} for transfer_type, transfer_type_data in all_transfer_type_data.items(): for endpoint in transfer_type_data["endpoint"]: - server_info = OSDF_DIRECTOR_SERVERS.get(f"https://{endpoint}") - endpoint_institution = "" - if server_info: - endpoint_name = server_info.get("name") - if endpoint_name: - endpoint_institution = TOPOLOGY_RESOURCE_DATA.get(endpoint_name.lower(), {"institution": f"Unmapped endpoint {endpoint_name}"})["institution"] - else: - endpoint_name = "Unnamed endpoint" - else: - endpoint_name = "Not currently found*" + server_info = OSDF_ENDPOINT_DATA.get(endpoint, {}) + endpoint_name = server_info.get("name", f"{endpoint} not found at director") or f"{endpoint} Not found at director" + endpoint_institution = server_info.get("institution", "Not found at registry") or "Not found at registry" row = { "endpoint": endpoint, "endpoint_institution": endpoint_institution, "endpoint_name": endpoint_name, - "endpoint_type": OSDF_DIRECTOR_SERVERS.get(f"https://{endpoint}", {"type": ""}).get("type", "") or "Cache*", + "endpoint_type": server_info.get("type") or "Cache*", } for attempt_type, attempt_data in { "total_attempts": all_transfer_type_data,