scheduler

JobScheduler classes

class scheduler.JobScheduler(source: PathFinder)[source]

Base class for scheduling a merge job

Initialize the JobScheduler with a source of files to merge.

Parameters:

source – PathFinder object to provide input files

assign_site(chunk: MergeChunk, site: str | None = None) None[source]

Assign a merging site for a chunk of files, and select the best replica for each file

Parameters:
  • chunk – MergeChunk object to assign a site for

  • site – Site assignment

chunk_distances(chunk: MergeChunk) dict[source]

Get the distances from a chunk to potential merging sites, based on the file distances. If any file is unreachable from a site, the chunk is also unreachable from that site.

Parameters:

chunk – MergeChunk object to get distances for

Returns:

Dictionary mapping site names to distances

async connect() None[source]

Connect to the file source

async disconnect() None[source]

Disconnect from the file source

file_distances(file: MergeFile) dict[source]

Get the distances from a file to potential merging sites, based on its replicas.

Parameters:

file – MergeFile object to get distances for

Returns:

Dictionary mapping site names to distances

property files: MergeSet

Return the set of files from the source

async input_batches() AsyncGenerator[InputBatch, None][source]

Asynchronously check RSE-site distances for batches of input files

Returns:

InputBatch object containing skip index and list of MergeFile objects

async replica_distances(replica: Replica) dict[source]

Get the distances from a replica to potential merging sites.

Parameters:

replica – Replica object to get distances for

Returns:

Dictionary mapping site names to distances

run() None[source]

Run the Job scheduler.

Returns:

None

run_loop() None[source]

Retrieve metadata for all files.

abstract schedule(chunk: MergeChunk) None[source]

Schedule a chunk for merging, subdividing and assigning to sites as necessary.

Parameters:

chunk – MergeChunk object to schedule

split_files(files: list) list[list][source]

Split a list of files into groups for merging, based on the configured chunk size.

Parameters:

files – List of MergeFile objects to split

Returns:

List of lists of MergeFile objects, where each sublist is a group for merging

abstract write_script() list[source]

Write the job script

Returns:

List of the generated script file name(s)

write_specs(chunk) None[source]

Write merge specs for a chunk to JSON dictionary files.

Parameters:

chunk – MergeChunk object to write

class scheduler.JustinScheduler(source: PathFinder)[source]

Job scheduler for JustIN merge jobs

Initialize the JustinScheduler with a source of files to merge.

Parameters:

source – PathFinder object to provide input files

async connect() None[source]

Connect to the file source

justin_cmd(tier: int, site: str) str[source]

Create the JustIN command for submitting a merge job.

Parameters:
  • tier – Merge pass number (0-indexed!)

  • site – Site to run the job

  • cvmfs_dir – CVMFS directory where config files are located

Returns:

JustIN command string

schedule(chunk: MergeChunk) None[source]

Schedule a chunk for merging, subdividing and assigning to sites as necessary.

Parameters:

chunk – MergeChunk object to schedule

upload_cfg() None[source]

Make a tarball of the configuration files and upload them to cvmfs

Returns:

Path to the uploaded configuration directory

write_script() list[source]

Write the job scripts for submitting JustIN merge jobs.

Returns:

Name of the generated script file(s)

class scheduler.LocalScheduler(source)[source]

Job scheduler for local merge jobs

Initialize the JobScheduler with a source of files to merge.

Parameters:

source – PathFinder object to provide input files

async connect() None[source]

Connect to the file source

async replica_distances(replica: Replica) dict[source]

Get the distances from a replica to potential merging sites.

Parameters:

replica – Replica object to get distances for

Returns:

Dictionary mapping site names to distances

schedule(chunk: MergeChunk) None[source]

Schedule a chunk for merging, clearing any site assignment and subdividing as necessary.

Parameters:

chunk – MergeChunk object to schedule

write_script() list[source]

Write the job script for running local merge jobs.

Returns:

Name of the generated script file