scheduler¶
JobScheduler classes
- class scheduler.JobScheduler(source: PathFinder)[source]¶
Base class for scheduling a merge job
Initialize the JobScheduler with a source of files to merge.
- Parameters:
source – PathFinder object to provide input files
- assign_site(chunk: MergeChunk, site: str | None = None) None[source]¶
Assign a merging site for a chunk of files, and select the best replica for each file
- Parameters:
chunk – MergeChunk object to assign a site for
site – Site assignment
- chunk_distances(chunk: MergeChunk) dict[source]¶
Get the distances from a chunk to potential merging sites, based on the file distances. If any file is unreachable from a site, the chunk is also unreachable from that site.
- Parameters:
chunk – MergeChunk object to get distances for
- Returns:
Dictionary mapping site names to distances
- file_distances(file: MergeFile) dict[source]¶
Get the distances from a file to potential merging sites, based on its replicas.
- Parameters:
file – MergeFile object to get distances for
- Returns:
Dictionary mapping site names to distances
- async input_batches() AsyncGenerator[InputBatch, None][source]¶
Asynchronously check RSE-site distances for batches of input files
- Returns:
InputBatch object containing skip index and list of MergeFile objects
- async replica_distances(replica: Replica) dict[source]¶
Get the distances from a replica to potential merging sites.
- Parameters:
replica – Replica object to get distances for
- Returns:
Dictionary mapping site names to distances
- abstract schedule(chunk: MergeChunk) None[source]¶
Schedule a chunk for merging, subdividing and assigning to sites as necessary.
- Parameters:
chunk – MergeChunk object to schedule
- split_files(files: list) list[list][source]¶
Split a list of files into groups for merging, based on the configured chunk size.
- Parameters:
files – List of MergeFile objects to split
- Returns:
List of lists of MergeFile objects, where each sublist is a group for merging
- class scheduler.JustinScheduler(source: PathFinder)[source]¶
Job scheduler for JustIN merge jobs
Initialize the JustinScheduler with a source of files to merge.
- Parameters:
source – PathFinder object to provide input files
- justin_cmd(tier: int, site: str) str[source]¶
Create the JustIN command for submitting a merge job.
- Parameters:
tier – Merge pass number (0-indexed!)
site – Site to run the job
cvmfs_dir – CVMFS directory where config files are located
- Returns:
JustIN command string
- schedule(chunk: MergeChunk) None[source]¶
Schedule a chunk for merging, subdividing and assigning to sites as necessary.
- Parameters:
chunk – MergeChunk object to schedule
- class scheduler.LocalScheduler(source)[source]¶
Job scheduler for local merge jobs
Initialize the JobScheduler with a source of files to merge.
- Parameters:
source – PathFinder object to provide input files
- async replica_distances(replica: Replica) dict[source]¶
Get the distances from a replica to potential merging sites.
- Parameters:
replica – Replica object to get distances for
- Returns:
Dictionary mapping site names to distances
- schedule(chunk: MergeChunk) None[source]¶
Schedule a chunk for merging, clearing any site assignment and subdividing as necessary.
- Parameters:
chunk – MergeChunk object to schedule