"""Tools for working with paths and xrootd URLs."""
import os
import sys
import subprocess
import logging
import enum
import asyncio
import collections
from dataclasses import dataclass
from typing import AsyncGenerator
from abc import ABC, abstractmethod
from merge_utils import io_utils, config
from merge_utils.merge_set import MergeSet, MergeFile, MergeFileError
from merge_utils.retriever import MetaRetriever, InputBatch
from merge_utils.rucio_utils import RucioWrapper
logger = logging.getLogger(__name__)
# Utility functions for extracting components from URLs
[docs]
def get_protocol(url: str) -> str:
"""
Extract the protocol from a URL or local file path
:param url: URL of the form 'protocol://host:port/path', or a local file path
:return: protocol from URL, or 'file' if it is a local file path
"""
if "://" in url:
return url.split("://", 1)[0]
return "file"
[docs]
def get_host(url: str) -> str:
"""
Extract the host from a URL or local file path
:param url: URL of the form 'protocol://host:port/path', or a local file path
:return: host from URL, or 'local' if it is a local file path
"""
if "://" in url:
return url.split('/', 3)[2].split(':')[0]
return "local"
[docs]
def get_port(url: str) -> int:
"""
Extract the port from a URL or local file path
:param url: URL of the form 'protocol://host:port/path', or a local file path
:return: port from URL, or None if it is a local file path or no port is specified
"""
if "://" in url:
parts = url.split('/', 3)[2].split(':')
if len(parts) > 1:
return int(parts[1])
return None
[docs]
def get_path(url: str) -> str:
"""
Extract the path from a URL or local file path
:param url: URL of the form 'protocol://host:port/path', or a local file path
:return: path from URL, or the original string if it is a local file path
"""
if "://" in url:
return '/' + url.split('/', 3)[3]
return url
# Utility functions for converting between local paths and xrootd URLs
[docs]
def path_to_xrootd(path: str) -> str:
"""
Convert a local file path to an xrootd URL
:param path: local file path
:return: xrootd URL corresponding to the local file path, or None if conversion fails
"""
if not config.local.site:
return None
urls = config.local.xrootd[str(config.local.site)]
for url_prefix, path_prefix in sorted(urls.items(), key=lambda x: len(x[0]), reverse=True):
path_prefix = str(path_prefix)
if path.startswith(path_prefix):
return path.replace(path_prefix, url_prefix, 1)
return None
[docs]
def xrootd_to_path(url: str) -> str:
"""
Convert an xrootd URL to a local file path
:param url: xrootd URL
:return: local file path corresponding to the xrootd URL, or None if conversion fails
"""
if not config.local.site:
return None
urls = config.local.xrootd[str(config.local.site)]
for url_prefix, path_prefix in sorted(urls.items(), key=lambda x: len(x[0]), reverse=True):
path_prefix = str(path_prefix)
if url.startswith(url_prefix):
return url.replace(url_prefix, path_prefix, 1)
return None
# Classes for representing file replicas and their statuses
[docs]
class Status(enum.Enum, metaclass=StatusMeta):
"""Enumeration of possible file statuses"""
ONLINE = enum.auto()
NEARLINE = enum.auto()
OFFLINE = enum.auto()
UNKNOWN = enum.auto()
UNREACHABLE = enum.auto()
MISSING = enum.auto()
BAD_SIZE = enum.auto()
BAD_CHECKSUM = enum.auto()
BAD_PROTOCOL = enum.auto()
@property
def good(self) -> bool:
"""Return True if this status indicates a good file replica"""
return self in {Status.ONLINE, Status.NEARLINE}
@property
def bad(self) -> bool:
"""Return True if this status indicates a bad file replica"""
return not self.good
[docs]
@dataclass
class Replica:
"""Class representing a file replica, including its path and status."""
path: str
rse: 'BaseRSE' = None
status: Status = Status.UNREACHABLE # Assume unreachable until we can check otherwise
distance: float = float('inf')
@property
def protocol(self) -> str:
"""Get the protocol of the replica's path"""
return get_protocol(self.path)
def __lt__(self, other: 'Replica') -> bool:
"""Sort replicas based on their status and distance"""
# First sort by good vs bad status
self_good = self.status.good
other_good = other.status.good
if self_good and not other_good:
return True
if other_good and not self_good:
return False
# Bad statuses are roughly ordered by severity
if not self_good and not other_good and self.status != other.status:
return self.status.value < other.status.value
# If both replicas have the same status, sort by distance
if self.distance != other.distance:
return self.distance < other.distance
return self.path < other.path
def __str__(self) -> str:
if self.distance == float('inf'):
return f"{self.rse.name}: {self.status.name}"
return f"{self.rse.name}: {self.status.name} (d = {self.distance})"
# Classes for representing RSEs and checking the status of replicas on those RSEs
[docs]
class BaseRSE(ABC):
"""Base class for an RSE, which can be used to check the status of file replicas"""
def __init__(self):
self.name = None
self.urls = {}
self.distance = float('inf')
self.disk = None
self.staging = None
self.read = True
self.write = True
[docs]
def ping(self) -> float:
"""Get the ping time to the RSE in ms"""
if len(self.urls) == 0:
logger.warning("No URLs found for RSE %s, cannot ping", self.name)
return float('inf')
if 'file' in self.urls:
logger.debug("RSE %s is local, skipping ping", self.name)
return 0.0
# Try pinging the URLs
best_ping = float('inf')
hosts = set(get_host(url) for url in self.urls.values())
for host in hosts:
cmd = ['ping', '-c', '1', host]
ret = subprocess.run(cmd, capture_output=True, check=False)
if ret.returncode != 0:
logger.debug("Failed to ping %s", host)
continue
ping = float(ret.stdout.split()[-2].split(b'/')[0]) # min ping time
logger.debug("Pinged %s, t = %.1f ms", host, ping)
best_ping = min(best_ping, ping)
if best_ping != float('inf'):
logger.info("Best ping to RSE %s is %.1f ms", self.name, best_ping)
return best_ping
# If we get here, all pings failed
logger.warning("Failed to ping any URLs for RSE %s", self.name)
return float('inf')
[docs]
async def xrdfs(self, replica: Replica, cmd: str, timeout: float = 1) -> str:
"""
Run an xrdfs command on a replica and return the output
In case of failure, set the replica status accordingly and return None
:param replica: Replica object to check
:param cmd: xrdfs command to run (e.g. 'ls -l')
:param timeout: timeout for the xrdfs command in seconds
:return: stdout of the xrdfs command, or None if it failed
"""
protocol = replica.protocol
if protocol != 'root':
raise ValueError(f"Unsupported protocol for xrdfs: {protocol}")
host = get_host(replica.path)
port = get_port(replica.path)
path = get_path(replica.path)
url = f"{protocol}://{host}:{port}"
full_cmd = ['xrdfs', url] + cmd.split() + [path]
try:
ret = await asyncio.to_thread(subprocess.run, full_cmd, capture_output=True,
text=True, check=False, timeout=timeout)
except subprocess.TimeoutExpired:
logger.debug("Timeout accessing xrootd server %s", host)
replica.status = Status.UNREACHABLE
return None
if ret.returncode != 0:
replica.status = Status.UNREACHABLE
if ret.returncode == 51:
logger.debug("Invalid xrootd server %s", host)
elif ret.returncode == 52:
logger.debug("Auth failed for xrootd server %s", host)
elif ret.returncode == 54:
logger.debug("No such file %s", replica.path)
replica.status = Status.MISSING
elif ret.returncode != 0:
logger.debug("Failed to access %s\n %s", replica.path, ret.stderr.strip())
return None
return ret.stdout.strip()
[docs]
async def checksum_xrootd(self, replica: Replica, cksums: dict) -> bool:
"""
Check the checksums of a remote file against expected values
:param replica: Replica object to check
:param cksums: dict of {algorithm: expected_checksum} pairs to check against
:return: True if any matching checksums are found, False otherwise
"""
logger.debug("RSE %s Checking xrootd checksums for file %s", self.name, replica.path)
xrdfs_cksums = await self.xrdfs(replica, 'query checksum')
if xrdfs_cksums is None:
logger.warning("Failed to get checksums for file %s", replica.path)
return False
unknown_algos = set()
for line in xrdfs_cksums.splitlines():
algo, cksum = line.split()
if algo not in cksums:
unknown_algos.add(algo)
continue
if cksum == cksums[algo]:
logger.debug("Checksum %s matches for file %s", algo, replica.path)
return True
logger.warning("File %s has bad %s checksum: %s != %s",
replica.path, algo, cksum, cksums[algo])
return False
if unknown_algos:
logger.info("File %s has checksums for unknown algorithms: %s",
replica.path, unknown_algos)
logger.warning("Checksum failed for file %s", replica.path)
return False
[docs]
async def checksum_local(self, path: str, cksums: dict) -> bool:
"""
Check the checksums of a local file against expected values
:param path: path to the local file
:param cksums: dict of {algorithm: expected_checksum} pairs to check against
:return: True if any matching checksums are found, False otherwise
"""
logger.debug("RSE %s Checking local checksums for file %s", self.name, path)
algorithms = [
('adler32', 'xrdadler32'),
('md5', 'md5sum'),
('sha256', 'sha256sum'),
('sha512', 'sha512sum')
]
# Check known algorithms
for algo, cmd in algorithms:
if algo not in cksums:
continue
expected = cksums[algo]
ret = await asyncio.to_thread(subprocess.run, [cmd, path],
capture_output=True, text=True, check=False)
if ret.returncode != 0:
logger.debug("Failed to run %s for file %s: %s", cmd, path, ret.stderr.strip())
continue
actual = ret.stdout.strip().split()[0]
if actual == expected:
return True
logger.warning("File %s has bad %s checksum: %s != %s",
path, algo, actual, expected)
return False
# Failed to verify any checksums
checked = set(a[0] for a in algorithms)
unchecked = set(cksums.keys()) - checked
if unchecked:
logger.debug("Missing commands for checksum algorithms: %s", ', '.join(unchecked))
logger.warning("Checksum failed for file %s", path)
return False
[docs]
async def cache_xrootd(self, replica: Replica, timeout: float = 1) -> None:
"""
Check if a replica is online or nearline by using gfal-xattr to query user.status
:param replica: Replica object to check
"""
logger.debug("RSE %s Checking xrootd cache status for file %s", self.name, replica.path)
# Skip cache check if the distance is already too high
if replica.distance > config.sites.max_distance:
replica.status = Status.UNREACHABLE
return
# Assume nearline unless we can confirm it is online
replica.status = Status.NEARLINE
cmd = ['gfal-xattr', '-t', int(timeout), replica.path, 'user.status']
try:
ret = await asyncio.to_thread(subprocess.run, cmd, capture_output=True,
text=True, check=False, timeout=timeout+1)
except subprocess.TimeoutExpired:
logger.debug("Timeout running gfal-xattr on %s", replica.path)
return
if ret.returncode != 0:
# code 110 is timeout
logger.debug("Failed to run gfal-xattr on %s\n %s", replica.path, ret.stderr.strip())
return
status = ret.stdout.strip()
if status == 'UNKNOWN':
logger.info("Got UNKNOWN status for %s, assuming NEARLINE", replica.path)
return
# If we got a valid status, set it on the replica
replica.status = Status[status]
[docs]
def cache_local(self, replica: Replica) -> None:
"""
Check if a replica is online or nearline by reading the locality stat file
:param replica: Replica object to check
"""
logger.debug("RSE %s Checking local cache status for file %s", self.name, replica.path)
directory, filename = os.path.split(replica.path)
stat_file=f"{directory}/.(get)({filename})(locality)"
if not os.path.exists(stat_file):
# normal file not in DCACHE?
replica.status = Status.ONLINE
return
with open(stat_file, encoding="utf-8") as stats:
status = stats.readline().strip()
replica.status = Status[status]
[docs]
async def check_cache(self, replica: Replica) -> None:
"""
Check if a replica is online or nearline using the appropriate method
:param replica: Replica object to check
"""
logger.debug("RSE %s Checking cache status for file %s", self.name, replica.path)
# For non-dcache RSEs, set status to ONLINE or NEARLINE depending on tape vs disk type
if self.staging is None:
replica.status = Status.ONLINE if self.disk else Status.NEARLINE
return
# For dcache RSEs, skip the cache check if there is no staging penalty
if self.staging <= 0:
replica.status = Status.ONLINE
return
# Check the cache status of the file
if replica.protocol == 'file':
logger.debug("RSE %s checking local cache for file %s", self.name, replica.path)
await asyncio.to_thread(self.cache_local, replica)
else:
logger.debug("RSE %s checking xrootd cache for file %s", self.name, replica.path)
await self.cache_xrootd(replica)
# If the file is not online, add the staging penalty to the distance
if replica.status != Status.ONLINE:
replica.distance += self.staging
[docs]
async def check_local(self, replica: Replica, size: int = None, cksums: dict = None):
"""
Check the status of a local file replica
:param replica: Replica object to check
:param size: optionally check the file size against an expected value
:param cksums: optionally check the file checksums against a dict of {algorithm: checksum}
"""
# Make sure the file exists and is readable
if not await asyncio.to_thread(os.path.isfile, replica.path):
replica.status = Status.MISSING
return
if not await asyncio.to_thread(os.access, replica.path, os.R_OK):
replica.status = Status.OFFLINE
return
# Check file size and checksums if we have expected values
if size and await asyncio.to_thread(os.path.getsize, replica.path) != size:
replica.status = Status.BAD_SIZE
return
if cksums and not await self.checksum_local(replica.path, cksums):
replica.status = Status.BAD_CHECKSUM
return
# Check the cache status of the file
await self.check_cache(replica)
[docs]
async def check_xrootd(self, replica: Replica, size: int = None, cksums: dict = None):
"""
Check the status of a remote file replica accessed via xrootd
:param replica: Replica object to check
:param size: optionally check the file size against an expected value
:param cksums: optionally check the file checksums against a dict of {algorithm: checksum}
"""
# If we have an expected size, make sure the file exists and matches that size
if size:
ls = await self.xrdfs(replica, 'ls -l')
if ls is None:
return
ls_perm, _, _, ls_size, _ = ls.split() # permissions, date, time, size, name
# Make sure the file is readable
if ls_perm[1] != 'r':
logger.debug("File %s is not readable", replica.path)
return
# Check the size
if ls_size != size:
replica.status = Status.BAD_SIZE
return
# Check the checksums, if we have expected values
if cksums and not await self.checksum_xrootd(replica, cksums):
replica.status = Status.BAD_CHECKSUM
return
# Check the cache status of the file
await self.check_cache(replica)
[docs]
async def check(self, replica: Replica, size: int = None, cksums: dict = None):
"""
Check the status of a file replica on the RSE
:param replica: Replica object to check
:param size: optionally check the file size against an expected value
:param cksums: optionally check the file checksums against a dict of {algorithm: checksum}
"""
logger.debug("RSE %s checking replica %s", self.name, replica.path)
replica.distance = self.distance
# Don't bother checking bad RSEs
if self.distance > config.sites.max_distance:
logger.debug("RSE %s is too far away (d = %d)", self.name, self.distance)
replica.status = Status.UNREACHABLE
return
if self.read is False:
logger.debug("RSE %s is not readable", self.name)
replica.status = Status.OFFLINE
return
# Check replica using the appropriate method based on the protocol
protocol = replica.protocol
# For local files, check directly but try to convert to xrootd URL if possible
if protocol == 'file':
logger.debug("RSE %s checking local replica %s", self.name, replica.path)
await self.check_local(replica, size=size, cksums=cksums)
print("finished local check")
if 'xrootd' in self.urls:
replica.path = replica.path.replace(self.urls['file'], self.urls['xrootd'], 1)
return
# For local RSEs, get local path and check directly
if 'file' in self.urls:
url = replica.path
replica.path = url.replace(self.urls[protocol], self.urls['file'], 1)
logger.debug("RSE %s converting to local path %s", self.name, replica.path)
await self.check_local(replica, size=size, cksums=cksums)
replica.path = url
return
# For xrootd files, check using xrdfs and gfal-xattr
if protocol == 'root':
logger.debug("RSE %s checking xrootd replica %s", self.name, replica.path)
await self.check_xrootd(replica, size=size, cksums=cksums)
return
# If we get here, we don't know how to check this replica
logger.debug("Unsupported protocol %s for replica %s", protocol, replica.path)
replica.status = Status.BAD_PROTOCOL
return
[docs]
class GenericRSE(BaseRSE):
"""Class to store information about an unknown RSE"""
def __init__(self, url: str = None, name: str = None):
super().__init__()
# If we have a name, check config for info about the RSE
if name:
if name in config.sites.dcache:
url = config.sites.dcache[name]['url']
self.staging = float(config.sites.dcache[name]['staging'])
if not url:
raise ValueError(f"No URL found for RSE {name} in config")
elif url is None:
raise ValueError("Must provide either a name or url for the RSE")
# Otherwise, use URL host name for the RSE
host = get_host(url)
self.name = name or host
# Set url
protocol = get_protocol(url)
self.urls[protocol] = url
# Try to convert file paths to xrootd URLs and vice versa
path = None
if protocol == 'file':
path = url
url = path_to_xrootd(path)
if url:
self.urls['root'] = url
host = get_host(url)
if name is None:
self.name = host
elif protocol == 'root':
path = xrootd_to_path(url)
if path:
self.urls['file'] = path
else:
logger.debug("Creating RSE %s with unknown protocol %s", self.name, protocol)
# Check for distance based on RSE name, full URL, then host name
if name and name in config.sites.rse_distances:
self.distance = float(config.sites.rse_distances[name])
logger.debug("Found distance for RSE %s based on name: %s", self.name, self.distance)
elif url in config.sites.rse_distances:
self.distance = float(config.sites.rse_distances[url])
logger.debug("Found distance for RSE %s based on URL: %s", self.name, self.distance)
elif host in config.sites.rse_distances:
self.distance = float(config.sites.rse_distances[host])
logger.debug("Found distance for RSE %s based on host: %s", self.name, self.distance)
# If we don't have a distance, using staging penalty to represent tape vs disk
else:
logger.debug("Using default distances for RSE %s", self.name)
self.distance = float(config.sites.rse_distances['disk'])
if self.staging is None:
self.staging = float(config.sites.rse_distances['tape']) - self.distance
# Add ping time to the distance
self.distance += self.ping()
[docs]
class RucioRSE(BaseRSE):
"""Class to store information about a Rucio RSE"""
def __init__(self, info: dict):
super().__init__()
self.name = info['rse']
logger.debug("Initializing Rucio RSE %s", self.name)
self.read = info['availability_read'] and not info.get('deleted', False)
self.write = info['availability_write'] and not info.get('deleted', False)
if self.name in config.sites.dcache:
self.staging = float(config.sites.dcache[self.name]['staging'])
logger.debug("RSE %s is dcache with staging penalty %.0f", self.name, self.staging)
self.disk = True
elif info['rse_type'] == 'DISK':
self.disk = True
elif info['rse_type'] == 'TAPE':
self.disk = False
else:
logger.error("RSE %s has unknown type %s", self.name, info['rse_type'])
self.disk = None
self.set_distance()
self.set_urls(info['protocols'])
[docs]
def set_distance(self) -> None:
"""Get the distance offset for this RSE from the config"""
if self.name in config.sites.rse_distances:
self.distance = float(config.sites.rse_distances[self.name])
elif self.disk is True:
self.distance = float(config.sites.rse_distances['disk'])
elif self.disk is False:
self.distance = float(config.sites.rse_distances['tape'])
else:
self.distance = float('inf')
logger.debug("Set distance for RSE %s to %.0f", self.name, self.distance)
[docs]
def set_urls(self, protocols: list) -> None:
"""
Get the URL prefixes for this RSE from the protocols list
:param protocols: list of protocol dictionaries from Rucio
"""
for proto in protocols:
scheme = proto['scheme']
url = f"{scheme}://{proto['hostname']}:{proto['port']}{proto['prefix']}"
self.urls[scheme] = url
# Try converting xrootd URLs to local paths
if scheme == 'root':
path = xrootd_to_path(url)
if path:
self.urls['file'] = path
# PathFinder classes
[docs]
class PathFinder(MetaRetriever):
"""Base class for finding paths to files"""
name: str = "replicas"
file_owner: bool = False
def __init__(self, meta: MetaRetriever):
super().__init__()
self.meta = meta
self.client = RucioWrapper()
self.rses = {}
self.replica_queue = None
self.workers = []
@property
def files(self) -> MergeSet:
"""Return the set of files from the source"""
return self.meta.files
[docs]
async def replica_checker(self) -> None:
"""Asynchronous worker method to check the status of replicas from the replica queue"""
while True:
# Wait for a replica to check from the queue
job = await self.replica_queue.get()
# If we get a None job, it means we should stop the checker
if job is None:
self.replica_queue.task_done()
break
# Check the replica and mark the job as done
await job[0].rse.check(job[0], size=job[1], cksums=job[2])
self.replica_queue.task_done()
[docs]
async def check_replica(self, replica: Replica, size: int = None, cksums: dict = None) -> None:
"""
Add a replica to the replica queue for asynchronous checking
:param replica: Replica object to check
:param size: optionally check the file size against an expected value
:param cksums: optionally check the file checksums against a dict of {algorithm: checksum}
"""
logger.debug("Queueing replica %s on RSE %s for checking", replica.path, replica.rse.name)
await self.replica_queue.put((replica, size, cksums))
[docs]
async def connect(self) -> None:
"""Connect to the file source and rucio"""
await asyncio.gather(self.meta.connect(), self.client.connect())
self.replica_queue = asyncio.Queue()
for _ in range(int(config.validation.concurrency)):
worker = asyncio.create_task(self.replica_checker())
self.workers.append(worker)
[docs]
async def disconnect(self) -> None:
"""Disconnect from the file source and rucio, and stop the replica checkers"""
await asyncio.gather(self.meta.disconnect(), self.client.disconnect())
# Stop the replica checkers
for _ in self.workers:
await self.replica_queue.put(None)
await asyncio.gather(*self.workers)
[docs]
@abstractmethod
async def add_replica(self, file: MergeFile, path: str, rse_name: str = None) -> None:
"""
Add a replica to a MergeFile object, including its path and RSE information
:param file: MergeFile object to add the replica to
:param path: file path for the replica
:param rse_name: optional RSE name; if not provided, it will be inferred from the path
"""
[docs]
@abstractmethod
async def get_paths(self, batch: InputBatch) -> list:
"""
Asynchronously retrieve paths for a specific batch of files.
:param batch: InputBatch object containing files to retrieve paths for
:return: list of file path dictionaries
"""
# retrieve paths for specific batch
[docs]
@abstractmethod
async def set_paths(self, batch: InputBatch, paths: list) -> None:
"""
Asynchronously set paths for a specific batch of files.
:param batch: InputBatch object containing files to process
:param paths: list of file path dictionaries to use for setting paths
"""
# process files to find paths
[docs]
class RucioFinder (PathFinder):
"""Class for managing asynchronous queries to the Rucio web API."""
name: str = "rucio"
[docs]
async def connect(self) -> None:
"""Connect to the file source and rucio"""
await super().connect()
if not self.client:
logger.critical("Failed to connect to Rucio client")
sys.exit(1)
[docs]
async def add_replica(self, file: MergeFile, path: str, rse_name: str = None) -> None:
"""
Add a replica to a MergeFile object
:param file: MergeFile object to add the replica to
:param path: file path for the replica
:param rse_name: RSE name for the replica
"""
if not rse_name:
raise ValueError("RucioFinder requires an RSE name to add a replica")
rse = self.rses.get(rse_name)
if not rse:
rse_info = await self.client.get_rse(rse_name)
if not rse_info:
logger.critical("RSE %s not found in Rucio, cannot add replica", rse_name)
sys.exit(1)
rse = RucioRSE(rse_info)
self.rses[rse_name] = rse
replica = Replica(path=path, rse=rse)
file.replicas.append(replica)
# Assume Rucio has already validated replica size and checksums
#await self.check_replica(replica, size=file.size, cksums=file.checksums)
await self.check_replica(replica)
[docs]
async def checksum(self, file: MergeFile, rucio: dict) -> bool:
"""
Ensure file sizes and checksums from Rucio agree with the input metadata.
:param file: MergeFile object to check
:param rucio: Rucio replicas dictionary
:return: True if files match, False otherwise
"""
# Check the file size
if file.size != rucio['bytes']:
crit = config.validation.error_handling.unreachable == 'quit'
lvl = logging.CRITICAL if crit else logging.ERROR
logger.log(lvl, "Size mismatch for %s: %d != %d", file.did, file.size, rucio['bytes'])
return False
# See if we should skip the checksum check
if len(config.validation.checksums) == 0:
return True
# Check the checksums
for algo in config.validation.checksums:
algo = str(algo)
if algo in file.checksums and algo in rucio:
csum1 = file.checksums[algo]
csum2 = rucio[algo]
if csum1 == csum2:
logger.debug("Found matching %s checksum for %s", algo, file.did)
return True
crit = config.validation.error_handling.unreachable == 'quit'
lvl = logging.CRITICAL if crit else logging.ERROR
logger.log(lvl, "%s checksum err for %s: %s != %s", algo, file.did, csum1, csum2)
return False
if algo not in file.checksums:
logger.debug("MetaCat missing %s checksum for %s", algo, file.did)
if algo not in rucio:
logger.debug("Rucio missing %s checksum for %s", algo, file.did)
# If we get here, we have no matching checksums
crit = config.validation.error_handling.unreachable == 'quit'
lvl = logging.CRITICAL if crit else logging.ERROR
logger.log(lvl, "No matching checksums for %s", file.did)
return False
[docs]
async def get_paths(self, batch: InputBatch) -> list:
"""
Asynchronously retrieve paths for a specific batch of files.
:param batch: InputBatch object containing files to retrieve paths for
:return: list of file path dictionaries
"""
return await self.client.get_replicas(batch.files)
[docs]
async def set_paths(self, batch: InputBatch, paths: list) -> None:
"""
Asynchronously set paths for a specific batch of files.
:param batch: InputBatch object containing files to process
:param paths: list of file path dictionaries from Rucio
"""
dids = {f.did: f for f in batch.files}
for replicas in paths:
did = replicas['scope'] + ':' + replicas['name']
pfns = replicas.get('pfns', {})
count = len(pfns)
logger.debug("Found %d replicas for %s", count, did)
if count == 0:
continue
file = dids[did]
if not await self.checksum(file, replicas):
continue
for pfn, info in pfns.items():
rse = info['rse']
await self.add_replica(file, pfn, rse_name=rse)
[docs]
class PathListFinder(PathFinder):
"""Class for finding paths from a list of explicit file paths"""
name: str = "replica_list"
def __init__(self, source: MetaRetriever, paths: dict = None):
super().__init__(source)
self.paths = paths or {}
[docs]
def add_rse(self, rse: BaseRSE) -> None:
"""Add an RSE to the list of known RSEs"""
for protocol, url in rse.urls.items():
rses = self.rses.setdefault(protocol, {})
rses[url] = rse
[docs]
async def connect(self) -> None:
"""Connect to the file source and rucio"""
await super().connect()
# If we have access to Rucio, get the list of RSEs
if self.client:
async for rse_info in self.client.get_rses():
self.add_rse(RucioRSE(rse_info))
return
# If this is a batch job, we need Rucio
if not config.output.local:
logger.critical("Failed to connect to Rucio client")
sys.exit(1)
# For local jobs, fall back to generic RSEs for DCACHE locations
for name in config.sites.dcache.keys():
self.add_rse(GenericRSE(name=name))
# Also check for path-like keys in the distance config to create generic RSEs for those
for url in config.sites.rse_distances.keys():
if '/' not in url:
continue
self.add_rse(GenericRSE(url=url))
[docs]
async def add_replica(self, file: MergeFile, path: str, rse_name: str = None) -> None:
"""
Add a replica to a MergeFile object
:param file: MergeFile object to add the replica to
:param path: file path for the replica
:param rse_name: RSE name for the replica
"""
protocol = get_protocol(path)
# Make sure local paths are absolute and expanded
if protocol == 'file':
path = io_utils.expand_path(path)
# Try to find an existing RSE that matches the path prefix
rses = self.rses.setdefault(protocol, {})
rse = None
for prefix, candidate in sorted(rses.items(), key=lambda x: len(x[0]), reverse=True):
if path.startswith(prefix):
rse = candidate
break
# Add a new RSE for this path if we don't have a match
if not rse:
if protocol == 'file':
prefix = "/{path.split('/',2)[1]}/"
else:
prefix = f"{protocol}://{get_host(path)}:{get_port(path)}/"
rse = GenericRSE(url=prefix)
self.add_rse(rse)
# Add the replica to the file
replica = Replica(path=path, rse=rse)
file.replicas.append(replica)
await self.check_replica(replica, size=file.size, cksums=file.checksums)
[docs]
async def get_paths(self, batch: InputBatch) -> list:
"""
Asynchronously retrieve paths for a specific batch of files.
:param batch: InputBatch object containing files to retrieve paths for
:return: list of file path dictionaries
"""
paths = []
for file in batch.files:
name = file.name
# Get any explicit paths provided for this file
file_paths = list(self.paths.get(name, []))
# If we have search directories, look for a matching file in those as well
for search_dir in config.input.search_dirs:
search_path = os.path.join(search_dir, name)
if os.path.isfile(search_path):
file_paths.append(search_path)
# If we have any paths for this file, add them to the list of paths to return
if file_paths:
paths.append({name: file_paths})
return paths
[docs]
async def set_paths(self, batch: InputBatch, paths: list) -> None:
"""
Asynchronously set paths for a specific batch of files.
:param batch: InputBatch object containing files to process
:param paths: list of file path dictionaries from Rucio
"""
# Consolidate list of paths into single dictionary
path_dict = {}
for file in paths:
path_dict.update(file)
# Assign paths to files
for file in batch.files:
name = file.name
replicas = path_dict.get(name, [])
count = len(replicas)
logger.debug("Found %d replicas for %s", count, file.did)
if count == 0:
continue
for path in replicas:
await self.add_replica(file, path)
[docs]
def get(metadata: MetaRetriever) -> PathFinder:
"""
Create and return a physical path finder:
PathListFinder if any data files were provided in files input mode
PathListFinder if any search directories were provided in other input modes
RucioFinder if no data files or search directories were provided
:return: PathFinder object for finding file locations
"""
# First check for explicit file paths in files input mode
if config.input.mode == 'files':
# Group data file paths by name
paths = collections.defaultdict(set)
for path in config.input.inputs:
path = str(path)
# If we have a JSON file, strip the extension and look for a matching data file
if path.endswith('.json'):
path = path[:-5]
if not os.path.isfile(path):
continue
name = os.path.basename(path)
paths[name].add(path)
if paths:
return PathListFinder(metadata, paths)
# Also return a PathListFinder if we have local search directories
if config.input.search_dirs:
return PathListFinder(metadata)
# Otherwise, we need to query Rucio to find the file paths
return RucioFinder(metadata)