Source code for merge_utils.pass2_fix
"""Update pass 2 json files and add to cvmfs directory before submission"""
import sys
import os
import shutil
import json
import tarfile
import subprocess
from rucio.client.replicaclient import ReplicaClient
[docs]
def get_cfgs(cfg_dir: str, files: list[str]) -> dict:
"""Get the configuration dictionaries from the list of files"""
cfgs = {}
for cfg in files:
path = os.path.join(cfg_dir, cfg)
if not os.path.isfile(path):
print(f"ERROR: Configuration file {path} does not exist!")
sys.exit(1)
with open(path, encoding="utf-8") as f:
cfgs[cfg] = json.load(f)
return cfgs
[docs]
def get_pfns(inputs: set) -> dict:
"""Get the physical file names from Rucio for the given input DIDs"""
query = []
for did in inputs:
scope, name = did.split(':', 1)
query.append({'scope': scope, 'name': name})
found = {}
unreachable = []
client = ReplicaClient()
res = client.list_replicas(query, ignore_availability=False)
for replicas in res:
did = replicas['scope'] + ':' + replicas['name']
count = len(replicas['pfns'])
if count == 0:
unreachable.append(did)
continue
if count > 1:
print(f"WARNING: Found {count} replicas for {did}, using the first one")
pfn = next(iter(replicas['pfns']))
found[did] = pfn
missing = [did for did in inputs if did not in found]
unreachable.extend(missing)
if len(unreachable) > 0:
print(f"ERROR: Failed to retrieve {len(unreachable)} file(s) from Rucio:")
for did in unreachable:
print(f" {did}")
print("Did the first merging pass complete successfully?")
sys.exit(1)
return found
[docs]
def main():
"""Main function for command line execution"""
cfg_dir = sys.argv[1]
cfgs = get_cfgs(cfg_dir, sys.argv[2:])
job_dir = os.path.dirname(cfg_dir)
inputs = set()
for cfg in cfgs.values():
inputs.update(cfg['inputs'])
print(f"Found {len(inputs)} unique inputs in {len(cfgs)} configuration files")
print("Retrieving physical file paths from Rucio")
pfns = get_pfns(inputs)
cfg_pass2 = os.path.join(job_dir, "config_pass2.tar")
if os.path.exists(cfg_pass2):
os.remove(cfg_pass2)
cfg_base = os.path.join(job_dir, "config.tar")
if not os.path.isfile(cfg_base):
print(f"ERROR: Base configuration file {cfg_base} does not exist!")
sys.exit(1)
shutil.copyfile(cfg_base, cfg_pass2)
with tarfile.open(cfg_pass2, "a") as tar:
for name, cfg in cfgs.items():
cfg['inputs'] = [pfns[did] for did in cfg['inputs']]
fix_name = os.path.join(cfg_dir, name.replace('.json', '_fixed.json'))
if os.path.exists(fix_name):
os.remove(fix_name)
with open(fix_name, 'w', encoding="utf-8") as fjson:
fjson.write(json.dumps(cfg, indent=2))
tar.add(fix_name, name)
print("Uploading corrected configuration files to cvmfs")
proc = subprocess.run(['justin-cvmfs-upload', cfg_pass2], capture_output=True, check=False)
if proc.returncode != 0:
print(f"Failed to upload configuration files: {proc.stderr.decode('utf-8')}")
sys.exit(1)
cvmfs_dir = proc.stdout.decode('utf-8').strip()
print(f"Uploaded configuration files to {cvmfs_dir}")
print("Submitting pass 2 jobs to JustIN")
subprocess.run([os.path.join(job_dir, "pass2_justin.sh"), cvmfs_dir], check=False)
if __name__ == '__main__':
main()