"""JobScheduler classes"""
import logging
import os
import sys
import shutil
import json
import tarfile
import subprocess
import collections
from abc import ABC, abstractmethod
from merge_utils import io_utils, config
from merge_utils.retriever import PathFinder
logger = logging.getLogger(__name__)
[docs]
class JobScheduler(ABC):
"""Base class for scheduling a merge job"""
def __init__(self, source: PathFinder):
"""
Initialize the JobScheduler with a source of files to merge.
:param source: PathFinder object to provide input files
"""
self.source = source
self.dir = io_utils.expand_path(
os.path.join(config.output['scripts'], io_utils.get_timestamp()),
base_dir=io_utils.pkg_dir()
)
self.jobs = [collections.defaultdict(list), collections.defaultdict(list)]
[docs]
def write_json(self, chunk) -> str:
"""
Write a JSON dictionary to a file and return the file name.
:param chunk: MergeChunk object to write
:return: Name of the written JSON file
"""
json_dict = chunk.json
site = chunk.site
tier = chunk.tier
site_jobs = self.jobs[tier-1][site]
idx = len(site_jobs) + 1
if site:
name = f"pass{tier}_{site}_{idx:>06}.json"
else:
name = f"pass{tier}_{idx:>06}.json"
name = os.path.join(self.dir, name)
with open(name, 'w', encoding="utf-8") as fjson:
fjson.write(json.dumps(json_dict, indent=2))
site_jobs.append((name, chunk))
return name
[docs]
@abstractmethod
def write_script(self) -> list:
"""
Write the job script
:return: List of the generated script file name(s)
"""
[docs]
def run(self) -> None:
"""
Run the Job scheduler.
:return: None
"""
self.source.run()
os.makedirs(self.dir)
for chunk in self.source.output_chunks():
self.write_json(chunk)
if not self.jobs[0]:
logger.critical("No files to merge")
return
io_utils.log_print(f"Writing job config files to {self.dir}")
n_inputs = 0
n_stage1 = 0
n_outputs = 0
msg = [""]
for site, site_jobs in self.jobs[0].items():
site_inputs = sum(len(job[1]) for job in site_jobs)
site_stage1 = len(site_jobs)
site_outputs = sum(1 for job in site_jobs if job[1].chunk_id < 0)
n_inputs += site_inputs
n_stage1 += site_stage1
n_outputs += site_outputs
if site is None:
site = "local"
msg.append(f"{site}: \t{site_inputs} -> {site_stage1}")
n_stage2 = sum(len(site_jobs) for site_jobs in self.jobs[1].values())
n_outputs += n_stage2
script = self.write_script()
msg[0] = f"Merging {n_inputs} input files into {n_outputs} merged files:"
io_utils.log_print("\n ".join(msg))
if len(script) > 1:
if len(self.jobs[0]) > 1:
msg = ["A second merging pass is required due to distributed inputs:"]
else:
msg = ["A second merging pass is required due to high multiplicity:"]
for site, site_jobs in self.jobs[1].items():
site_inputs = sum(len(job[1].inputs) for job in site_jobs)
site_stage2 = len(site_jobs)
msg.append(f"{site}: \t{site_inputs} -> {site_stage2}")
io_utils.log_print("\n ".join(msg))
io_utils.log_print(f"Files will be merged using {config.merging['method']['name']}")
msg = ["Execute the merge by running:"] + script
io_utils.log_print("\n ".join(msg))
[docs]
class LocalScheduler(JobScheduler):
"""Job scheduler for local merge jobs"""
[docs]
def write_json(self, chunk) -> str:
"""
Write a JSON dictionary to a file and return the file name.
:param chunk: MergeChunk object to write
:return: Name of the written JSON file
"""
chunk.site = None # Local jobs do not require a site
return super().write_json(chunk)
[docs]
def write_script(self) -> list:
"""
Write the job script for running local merge jobs.
:return: Name of the generated script file
"""
out_dir = io_utils.expand_path(config.output['dir'])
if not os.path.exists(out_dir):
try:
os.makedirs(out_dir, exist_ok=True)
logger.info("Output directory '%s' created", out_dir)
except OSError as error:
logger.critical("Failed to create output directory '%s': %s", out_dir, error)
sys.exit(1)
for dep in config.merging['method']['dependencies']:
file_name = os.path.basename(dep)
logger.debug("Adding %s to job directory", file_name)
shutil.copyfile(dep, os.path.join(self.dir, file_name))
script_name = os.path.join(self.dir, "run.sh")
pass_msg = [
"echo 'Creating intermediate merged files'",
"echo 'Creating final merged files'"
]
with open(script_name, 'w', encoding="utf-8") as f:
f.write("#!/bin/bash\n")
f.write("# This script will run the merge jobs locally\n")
for tier in range(2):
if self.jobs[1]:
f.write(pass_msg[tier] + "\n")
for job in self.jobs[tier][None]:
cmd = ["LD_PRELOAD=$XROOTD_LIB/libXrdPosixPreload.so", "python3",
io_utils.find_runner("do_merge.py"), job[0], out_dir]
f.write(f"{' '.join(cmd)}\n")
subprocess.run(['chmod', '+x', script_name], check=False)
return [script_name]
[docs]
class JustinScheduler(JobScheduler):
"""Job scheduler for JustIN merge jobs"""
def __init__(self, source: PathFinder):
"""
Initialize the JustinScheduler with a source of files to merge.
:param source: PathFinder object to provide input files
"""
super().__init__(source)
self.cvmfs_dir = None
[docs]
def upload_cfg(self) -> None:
"""
Make a tarball of the configuration files and upload them to cvmfs
:return: Path to the uploaded configuration directory
"""
def add_file(tar, file_path = None):
if file_path is None:
return
file_name = os.path.basename(file_path)
logger.debug("Adding %s to config tarball", file_name)
tar.add(file_path, file_name)
io_utils.log_print("Uploading configuration files to cvmfs...")
cfg = os.path.join(self.dir, "config.tar")
with tarfile.open(cfg,"w") as tar:
for site_jobs in self.jobs[0].values():
for job in site_jobs:
add_file(tar, job[0])
add_file(tar, io_utils.find_runner("do_merge.py"))
for dep in config.merging['method']['dependencies']:
add_file(tar, dep)
proc = subprocess.run(['justin-cvmfs-upload', cfg], capture_output=True, check=False)
if proc.returncode != 0:
logger.error("Failed to upload configuration files: %s", proc.stderr.decode('utf-8'))
raise RuntimeError("Failed to upload configuration files")
self.cvmfs_dir = proc.stdout.decode('utf-8').strip()
logger.info("Uploaded configuration files to %s", self.cvmfs_dir)
[docs]
def justin_cmd(self, tier: int, site: str, cvmfs_dir: str = None) -> str:
"""
Create the JustIN command for submitting a merge job.
:param tier: Merge pass (1 or 2)
:param site: Site to run the job
:param cvmfs_dir: CVMFS directory where config files are located
:return: JustIN command string
"""
if site is None:
logger.critical("No site for pass %d job!", tier)
sys.exit(1)
if cvmfs_dir is None:
cvmfs_dir = self.cvmfs_dir
cmd = [
'justin', 'simple-workflow',
'--description', f'"Merge {io_utils.get_timestamp()} p{tier} {site}"',
'--monte-carlo', str(len(self.jobs[tier-1][site])),
'--jobscript', io_utils.find_runner("merge.jobscript"),
'--site', site,
'--scope', config.output['namespace'],
'--output-pattern', f"*_merged_*{config.merging['method']['ext']}",
'--lifetime-days', str(config.output['lifetime']),
'--env', f'MERGE_CONFIG="pass{tier}_{site}"',
'--env', f'CONFIG_DIR="{cvmfs_dir}"'
]
if config.merging['dune_version']:
cmd += ['--env', f'DUNE_VERSION="{config.merging["dune_version"]}"']
if config.merging['dune_qualifier']:
cmd += ['--env', f'DUNE_QUALIFIER="{config.merging["dune_qualifier"]}"']
return f"{' '.join(cmd)}\n"
[docs]
def write_script(self) -> list:
"""
Write the job scripts for submitting JustIN merge jobs.
:return: Name of the generated script file(s)
"""
self.upload_cfg()
# If no second pass is needed, create a single submission script
if len(self.jobs[1]) == 0:
script_name = os.path.join(self.dir, "submit.sh")
with open(script_name, 'w', encoding="utf-8") as f:
f.write("#!/bin/bash\n")
f.write("# This script will submit the JustIN jobs\n")
for site in self.jobs[0]:
f.write(self.justin_cmd(1, site))
subprocess.run(['chmod', '+x', script_name], check=False)
return [script_name]
# Pass 1 submission script
script_pass1 = os.path.join(self.dir, "submit_pass1.sh")
with open(script_pass1, 'w', encoding="utf-8") as f:
f.write("#!/bin/bash\n")
f.write("# This script will submit JustIN jobs for pass 1\n")
for site in self.jobs[0]:
f.write(self.justin_cmd(1, site))
subprocess.run(['chmod', '+x', script_pass1], check=False)
# Pass 2 JustIN submission commands
pass2_justin = os.path.join(self.dir, "pass2_justin.sh")
with open(pass2_justin, 'w', encoding="utf-8") as f:
f.write("#!/bin/bash\n"
"# This script will submit JustIN jobs for pass 2\n"
"# Use submit_pass2.sh to generate the cvmfs directory first!\n"
"cvmfs_dir=$1\n"
"if [ -z \"$cvmfs_dir\" ]; then\n"
" echo 'Use submit_pass2.sh instead of calling this script directly!'\n"
" exit 1\n"
"fi\n")
for site in self.jobs[1]:
f.write(self.justin_cmd(2, site, "$cvmfs_dir"))
subprocess.run(['chmod', '+x', pass2_justin], check=False)
# Pass 2 submission script
script_pass2 = os.path.join(self.dir, "submit_pass2.sh")
pass2_cfgs = []
for site_jobs in self.jobs[1].values():
pass2_cfgs.extend(os.path.basename(job[0]) for job in site_jobs)
with open(script_pass2, 'w', encoding="utf-8") as f:
f.write("#!/bin/bash\n")
f.write("# This script will update the cfg files for pass 2 before submission\n")
pass2_fix = os.path.join(io_utils.src_dir(), 'pass2_fix.py')
f.write(f"python3 {pass2_fix} {self.dir} {' '.join(pass2_cfgs)}\n")
subprocess.run(['chmod', '+x', script_pass2], check=False)
return [script_pass1, script_pass2]