replicas

Tools for working with paths and xrootd URLs.

class replicas.BaseRSE[source]

Base class for an RSE, which can be used to check the status of file replicas

cache_local(replica: Replica) None[source]

Check if a replica is online or nearline by reading the locality stat file

Parameters:

replica – Replica object to check

async cache_xrootd(replica: Replica, timeout: float = 1) None[source]

Check if a replica is online or nearline by using gfal-xattr to query user.status

Parameters:

replica – Replica object to check

async check(replica: Replica, size: int | None = None, cksums: dict | None = None)[source]

Check the status of a file replica on the RSE

Parameters:
  • replica – Replica object to check

  • size – optionally check the file size against an expected value

  • cksums – optionally check the file checksums against a dict of {algorithm: checksum}

async check_cache(replica: Replica) None[source]

Check if a replica is online or nearline using the appropriate method

Parameters:

replica – Replica object to check

async check_local(replica: Replica, size: int | None = None, cksums: dict | None = None)[source]

Check the status of a local file replica

Parameters:
  • replica – Replica object to check

  • size – optionally check the file size against an expected value

  • cksums – optionally check the file checksums against a dict of {algorithm: checksum}

async check_xrootd(replica: Replica, size: int | None = None, cksums: dict | None = None)[source]

Check the status of a remote file replica accessed via xrootd

Parameters:
  • replica – Replica object to check

  • size – optionally check the file size against an expected value

  • cksums – optionally check the file checksums against a dict of {algorithm: checksum}

async checksum_local(path: str, cksums: dict) bool[source]

Check the checksums of a local file against expected values

Parameters:
  • path – path to the local file

  • cksums – dict of {algorithm: expected_checksum} pairs to check against

Returns:

True if any matching checksums are found, False otherwise

async checksum_xrootd(replica: Replica, cksums: dict) bool[source]

Check the checksums of a remote file against expected values

Parameters:
  • replica – Replica object to check

  • cksums – dict of {algorithm: expected_checksum} pairs to check against

Returns:

True if any matching checksums are found, False otherwise

ping() float[source]

Get the ping time to the RSE in ms

async xrdfs(replica: Replica, cmd: str, timeout: float = 1) str[source]

Run an xrdfs command on a replica and return the output In case of failure, set the replica status accordingly and return None

Parameters:
  • replica – Replica object to check

  • cmd – xrdfs command to run (e.g. ‘ls -l’)

  • timeout – timeout for the xrdfs command in seconds

Returns:

stdout of the xrdfs command, or None if it failed

class replicas.GenericRSE(url: str | None = None, name: str | None = None)[source]

Class to store information about an unknown RSE

class replicas.PathFinder(meta: MetaRetriever)[source]

Base class for finding paths to files

abstract async add_replica(file: MergeFile, path: str, rse_name: str | None = None) None[source]

Add a replica to a MergeFile object, including its path and RSE information

Parameters:
  • file – MergeFile object to add the replica to

  • path – file path for the replica

  • rse_name – optional RSE name; if not provided, it will be inferred from the path

async check_replica(replica: Replica, size: int | None = None, cksums: dict | None = None) None[source]

Add a replica to the replica queue for asynchronous checking

Parameters:
  • replica – Replica object to check

  • size – optionally check the file size against an expected value

  • cksums – optionally check the file checksums against a dict of {algorithm: checksum}

async connect() None[source]

Connect to the file source and rucio

async disconnect() None[source]

Disconnect from the file source and rucio, and stop the replica checkers

property files: MergeSet

Return the set of files from the source

async get_metadata(batch: InputBatch, limit: int) list[source]

Asynchronously retrieve metadata for a specific batch of files.

Parameters:
  • batch – empty InputBatch object with the skip index set

  • limit – maximum number of files to retrieve

Returns:

list of file metadata dictionaries

abstract async get_paths(batch: InputBatch) list[source]

Asynchronously retrieve paths for a specific batch of files.

Parameters:

batch – InputBatch object containing files to retrieve paths for

Returns:

list of file path dictionaries

async input_batches() AsyncGenerator[InputBatch, None][source]

Asynchronously retrieve paths for the next batch of files.

Returns:

InputBatch object containing skip index and list of MergeFile objects

async replica_checker() None[source]

Asynchronous worker method to check the status of replicas from the replica queue

abstract async set_paths(batch: InputBatch, paths: list) None[source]

Asynchronously set paths for a specific batch of files.

Parameters:
  • batch – InputBatch object containing files to process

  • paths – list of file path dictionaries to use for setting paths

class replicas.PathListFinder(source: MetaRetriever, paths: dict | None = None)[source]

Class for finding paths from a list of explicit file paths

async add_replica(file: MergeFile, path: str, rse_name: str | None = None) None[source]

Add a replica to a MergeFile object

Parameters:
  • file – MergeFile object to add the replica to

  • path – file path for the replica

  • rse_name – RSE name for the replica

add_rse(rse: BaseRSE) None[source]

Add an RSE to the list of known RSEs

async connect() None[source]

Connect to the file source and rucio

async get_paths(batch: InputBatch) list[source]

Asynchronously retrieve paths for a specific batch of files.

Parameters:

batch – InputBatch object containing files to retrieve paths for

Returns:

list of file path dictionaries

async set_paths(batch: InputBatch, paths: list) None[source]

Asynchronously set paths for a specific batch of files.

Parameters:
  • batch – InputBatch object containing files to process

  • paths – list of file path dictionaries from Rucio

class replicas.Replica(path: str, rse: BaseRSE | None = None, status: Status = Status.UNREACHABLE, distance: float = inf)[source]

Class representing a file replica, including its path and status.

property protocol: str

Get the protocol of the replica’s path

class replicas.RucioFinder(meta: MetaRetriever)[source]

Class for managing asynchronous queries to the Rucio web API.

async add_replica(file: MergeFile, path: str, rse_name: str | None = None) None[source]

Add a replica to a MergeFile object

Parameters:
  • file – MergeFile object to add the replica to

  • path – file path for the replica

  • rse_name – RSE name for the replica

async checksum(file: MergeFile, rucio: dict) bool[source]

Ensure file sizes and checksums from Rucio agree with the input metadata.

Parameters:
  • file – MergeFile object to check

  • rucio – Rucio replicas dictionary

Returns:

True if files match, False otherwise

async connect() None[source]

Connect to the file source and rucio

async get_paths(batch: InputBatch) list[source]

Asynchronously retrieve paths for a specific batch of files.

Parameters:

batch – InputBatch object containing files to retrieve paths for

Returns:

list of file path dictionaries

async set_paths(batch: InputBatch, paths: list) None[source]

Asynchronously set paths for a specific batch of files.

Parameters:
  • batch – InputBatch object containing files to process

  • paths – list of file path dictionaries from Rucio

class replicas.RucioRSE(info: dict)[source]

Class to store information about a Rucio RSE

set_distance() None[source]

Get the distance offset for this RSE from the config

set_urls(protocols: list) None[source]

Get the URL prefixes for this RSE from the protocols list

Parameters:

protocols – list of protocol dictionaries from Rucio

class replicas.Status(value)[source]

Enumeration of possible file statuses

property bad: bool

Return True if this status indicates a bad file replica

property good: bool

Return True if this status indicates a good file replica

class replicas.StatusMeta(cls, bases, classdict, **kwds)[source]

Metaclass for Status enum to allow special handling of ‘ONLINE AND NEARLINE’ status.

replicas.get(metadata: MetaRetriever) PathFinder[source]

Create and return a physical path finder: PathListFinder if any data files were provided in files input mode PathListFinder if any search directories were provided in other input modes RucioFinder if no data files or search directories were provided

Returns:

PathFinder object for finding file locations

replicas.get_host(url: str) str[source]

Extract the host from a URL or local file path

Parameters:

url – URL of the form ‘protocol://host:port/path’, or a local file path

Returns:

host from URL, or ‘local’ if it is a local file path

replicas.get_path(url: str) str[source]

Extract the path from a URL or local file path

Parameters:

url – URL of the form ‘protocol://host:port/path’, or a local file path

Returns:

path from URL, or the original string if it is a local file path

replicas.get_port(url: str) int[source]

Extract the port from a URL or local file path

Parameters:

url – URL of the form ‘protocol://host:port/path’, or a local file path

Returns:

port from URL, or None if it is a local file path or no port is specified

replicas.get_protocol(url: str) str[source]

Extract the protocol from a URL or local file path

Parameters:

url – URL of the form ‘protocol://host:port/path’, or a local file path

Returns:

protocol from URL, or ‘file’ if it is a local file path

replicas.path_to_xrootd(path: str) str[source]

Convert a local file path to an xrootd URL

Parameters:

path – local file path

Returns:

xrootd URL corresponding to the local file path, or None if conversion fails

replicas.xrootd_to_path(url: str) str[source]

Convert an xrootd URL to a local file path

Parameters:

url – xrootd URL

Returns:

local file path corresponding to the xrootd URL, or None if conversion fails