Source code for merge_utils.justin_utils

"""Utility functions for interacting with the JustIN web API."""

import logging
import csv
import asyncio
import requests

from merge_utils import config

logger = logging.getLogger(__name__)

SITE_STORAGE_URL = "/api/info/sites_storages.csv"

[docs] async def get_site_rse_distances() -> dict: """ Retrieve site-RSE distances from the JustIN web API. Adds site distance offsets from the config Does NOT add RSE distance offsets, since those are already accounted for by the PathFinder :return: dictionary of {rse: {site: distance}} for all reachable site-RSE pairs """ # Query JustIN for site-RSE distances full_url = str(config.sites.justin_url) + SITE_STORAGE_URL try: res = await asyncio.to_thread(requests.get, full_url, verify=False, timeout=60) connected = res.ok except requests.ConnectionError as err: logger.error("JustIN connection error: %s", err) connected = False if not connected: return {} # Parse the CSV response distances = {} text = res.iter_lines(decode_unicode=True) fields = ['site', 'rse', 'dist', 'site_enabled', 'rse_read', 'rse_write'] reader = csv.DictReader(text, fields) default_dist = config.sites.site_distances['default'] for row in reader: # Skip disabled sites and RSEs with no read/write access if not row['site_enabled']: continue if not row['rse_read'] and not row['rse_write']: continue # Get site distance offset site = row['site'] site_dist = config.sites.site_distances.get(site, default_dist) if site_dist > config.sites.max_distance: continue # Get total distance distance = 100*float(row['dist']) + site_dist rse = row['rse'] if rse in distances: distances[rse][site] = distance else: distances[rse] = {site: distance} return distances