Source code for merge_utils.__main__

"""Command line interface for merge_utils."""

import argparse
import logging
import sys

from merge_utils import io_utils, config, scheduler, local

logger = logging.getLogger(__name__)

[docs] def main(): """Test the command line interface for merge_utils.""" parser = argparse.ArgumentParser( description='Command line interface for merge_utils') parser.add_argument('-c', '--config', action='append', metavar='CFG', help='a configuration file') parser.add_argument('-v', '--verbose', action='count', default=0, help='print more verbose output (e.g. -vvv for debug output)') parser.add_argument('--log', help='specify a custom log file path') in_group = parser.add_argument_group('input arguments') in_group.add_argument('input_mode', nargs='?', default=None, metavar='MODE', choices=['query', 'dids', 'files'], help='input mode (query, dids, files, dir)') in_group.add_argument('-f', '--file', action='append', help='a text file with a list of input files') in_group.add_argument('-d', '--dir', action='append', help='a directory to add to search locations') #in_group.add_argument('inputs', nargs=argparse.REMAINDER, help='remaining command line inputs') in_group.add_argument('inputs', nargs='*', help='remaining command line inputs') out_group = parser.add_argument_group('output arguments') out_group.add_argument('--list', choices=['dids', 'replicas', 'pfns'], metavar='OPT', help='list (dids, replicas, pfns) instead of merging') out_group.add_argument('-l', '--local', action='store_true', help='run merge locally instead of submitting to JustIN') args = parser.parse_args() print ("main arguments are: ",args) # Set up logging and configuration name = "merge" if args.list: name = "list "+args.list io_utils.setup_log(name, log_file=args.log, verbosity=args.verbose) config.load(args.config) if args.local: config.output['mode'] = 'local' if args.input_mode: config.inputs['mode'] = args.input_mode input_mode = config.inputs['mode'] logger.info("Input mode: %s", input_mode) # Collect inputs inputs = config.inputs['inputs'] or [] io_utils.log_nonzero("Found {n} input{s} from config files", len(inputs)) if io_utils.log_nonzero("Found {n} input{s} from command line", len(args.inputs)): inputs.extend(args.inputs) inputs.extend(io_utils.get_inputs(args.file)) if len(inputs) == 0: logger.critical("No input provided, exiting.") sys.exit(1) io_utils.log_list("Found {n} total input{s}:", inputs, logging.INFO) # Collect file search directories dirs = config.inputs['search_dirs'] or [] io_utils.log_nonzero("Found {n} search location{s} from config files", len(dirs)) if args.dir: io_utils.log_nonzero("Found {n} search location{s} from command line", len(args.dir)) dirs.extend(args.dir) io_utils.log_list("Found {n} total search location{s}:", dirs, logging.INFO) # Determine input mode and retrieve metadata paths = None metadata = None if input_mode == 'files': paths = local.get_local_files(inputs, dirs) metadata = paths.meta elif input_mode == 'query': from merge_utils.metacat_utils import MetaCatRetriever #pylint: disable=import-outside-toplevel if len(inputs) != 1: logger.critical("Query mode currently only supports a single MetaCat query.") sys.exit(1) metadata = MetaCatRetriever(query=inputs[0]) elif input_mode == 'dids': from merge_utils.metacat_utils import MetaCatRetriever #pylint: disable=import-outside-toplevel metadata = MetaCatRetriever(dids=inputs) else: logger.critical("Unknown input mode: %s", input_mode) sys.exit(1) # If we're only listing DIDs, we can skip the rest of the setup if args.list == 'dids': metadata.run() for file in metadata.files: print(file.did) return # Set up a retriever for physical file locations if needed if not paths: if dirs: logger.info("Searching for local data files in provided directories") paths = local.LocalPathFinder(metadata, dirs=dirs) else: logger.info("No local search directories provided, querying Rucio to find data files") from merge_utils.rucio_utils import RucioFinder #pylint: disable=import-outside-toplevel paths = RucioFinder(metadata) # Process the other list options if args.list: paths.run() if args.list == 'replicas': if input_mode in ['files']: print("Local file paths:") for file in paths.files: print(f" {file.path}") else: for name, rse in paths.rses.items(): print(f"RSE {name}:") for pfn in rse.pfns.values(): print(f" {pfn}") elif args.list == 'pfns': for chunk in paths.output_chunks(): print(f"Output file {chunk.name} (site {chunk.site}):") for pfn in chunk.values(): print(f" {pfn.path}") else: raise ValueError(f"Unknown list option: {args.list}") return # Process merging if args.local: sched = scheduler.LocalScheduler(paths) else: sched = scheduler.JustinScheduler(paths) sched.run()