"""Utility functions for merging metadata for multiple files."""
import os
import sys
import logging
import collections
from merge_utils import config, io_utils
logger = logging.getLogger(__name__)
[docs]
def fix(name: str, metadata: dict) -> None:
"""
Fix the metadata dictionary.
:param name: name of the file (for logging)
:param metadata: metadata dictionary
"""
fixes = []
# Fix misspelled keys
for bad_key, good_key in config.metadata['fixes']['keys'].items():
if bad_key in metadata:
fixes.append(f"Key '{bad_key}' -> '{good_key}'")
metadata[good_key] = metadata.pop(bad_key)
# Fix missing keys
for key, value in config.metadata['fixes']['missing'].items():
if key not in metadata:
fixes.append(f"Key '{key}' value None -> '{value}'")
metadata[key] = value
# Fix misspelled values
for key in config.metadata['fixes']:
if key in ['keys', 'missing'] or key not in metadata:
continue
value = metadata[key]
if value in config.metadata['fixes'][key]:
new_value = config.metadata['fixes'][key][value]
fixes.append(f"Key '{key}' value '{value}' -> '{new_value}'")
metadata[key] = new_value
if fixes:
io_utils.log_list("Applying {n} metadata fix{es} to file %s:" % name, fixes, logging.DEBUG)
[docs]
def check_required(metadata: dict) -> list:
"""
Check if the metadata dictionary contains all required keys.
:param metadata: metadata dictionary
:return: List of any missing required keys
"""
errs = []
# Check for required keys
required = set()
for key in config.metadata['required']:
required.add(key)
if key not in metadata:
if key in config.metadata['optional']:
continue
errs.append(f"Missing required key: {key}")
# Check for conditionally required keys
name_dict = MetaNameDict(metadata)
for condition, keys in config.metadata['conditional'].items():
if not name_dict.eval(condition):
logger.debug("Skipping condition: %s", condition)
continue
logger.debug("Matched condition: %s", condition)
for key in keys:
if key in required:
continue
required.add(key)
if key not in metadata and key not in config.metadata['optional']:
errs.append(f"Missing conditionally required key: {key} (from {condition})")
return errs
[docs]
def validate(name: str, metadata: dict, requirements: bool = True) -> bool:
"""
Validate the metadata dictionary.
:param name: name of the file (for logging)
:param metadata: metadata dictionary
:param requirements: whether to check for required keys
:return: True if metadata is valid, False otherwise
"""
# Fix metadata
fix(name, metadata)
errs = []
# Check for required keys
if requirements:
errs.extend(check_required(metadata))
# Check for restricted keys
for key, options in config.metadata['restricted'].items():
if key not in metadata:
continue
value = metadata[key]
if value not in options:
errs.append(f"Invalid value for {key}: {value}")
# Check value types
for key, expected_type in config.metadata['types'].items():
if key not in metadata or key in config.metadata['restricted']:
continue
value = metadata[key]
type_name = type(value).__name__
if (type_name == expected_type) or (expected_type == 'float' and type_name == 'int'):
continue
errs.append(f"Invalid type for {key}: {value} (expected {expected_type})")
if errs:
lvl = logging.ERROR if config.validation['skip']['invalid'] else logging.CRITICAL
io_utils.log_list("File %s has {n} invalid metadata key{s}:" % name, errs, lvl)
return False
return True
MERGE_META_CLASSES = {
'unique': MergeMetaUnique,
'all': MergeMetaAll,
'min': MergeMetaMin,
'max': MergeMetaMax,
'sum': MergeMetaSum,
'union': MergeMetaUnion,
'subset': MergeMetaSubset,
#'skip': MergeMetaOverride,
}
[docs]
def merged_keys(files: dict, warn: bool = False) -> dict:
"""
Merge metadata from multiple files into a single dictionary.
:param files: set of files to merge
:param warn: whether to warn about inconsistent metadata
:return: merged metadata
"""
metadata = collections.defaultdict(
MERGE_META_CLASSES[config.metadata['merging']['default']]
)
for key, mode in config.metadata['merging'].items():
if key in ['default', 'overrides']:
continue
if mode in MERGE_META_CLASSES:
metadata[key] = MERGE_META_CLASSES[mode]()
else:
metadata[key] = MergeMetaOverride()
for key, value in config.metadata['overrides'].items():
metadata[key] = MergeMetaOverride(value)
metadata['merge.method'] = MergeMetaOverride(config.merging['method']['name'])
for key in ['cmd', 'script', 'cfg']:
val = config.merging['method'][key]
if val is not None:
if key in ['script', 'cfg']:
val = os.path.basename(val)
metadata[f'merge.{key}'] = MergeMetaOverride(val)
for file in files.values():
for key, value in file.metadata.items():
metadata[key].add(value)
if warn:
io_utils.log_list("Omitting {n} inconsistent metadata key{s} from output:",
[k for k, v in metadata.items() if v.warn]
)
metadata = {k: v.value for k, v in metadata.items() if v.valid}
if not validate("output", metadata, requirements=False):
logger.critical("Merged metadata is invalid, cannot continue!")
raise ValueError("Merged metadata is invalid")
return metadata
[docs]
def parents(files: dict) -> list[str]:
"""
Retrieve all the parents from a set of files.
:param files: set of files to merge
:return: set of parents
"""
if not config.output['grandparents']:
logger.info("Listing direct parents")
output = []
for file in files.values():
output.append({
"fid": file.fid,
"name": file.name,
"namespace": file.namespace
})
return output
logger.info("Listing grandparents instead of direct parents")
grandparents = set()
for file in files.values():
for grandparent in file.parents:
grandparents.add(tuple(sorted(grandparent.items())))
return [dict(t) for t in grandparents]
[docs]
def set_method_auto(metadata: dict) -> None:
"""
Auto-select merging method based on metadata conditions.
:param metadata: metadata dictionary
"""
# Find the first matching merging method (in reverse order)
method = {}
name_dict = MetaNameDict(metadata)
for mtd in reversed(config.merging['methods']):
condition = mtd.get('cond', 'True')
if name_dict.eval(condition):
if condition == 'True':
condition = "unconditional"
logger.info("Auto-selected merging method '%s' (%s)", mtd['name'], condition)
method = mtd
break
if not method:
logger.critical("Failed to auto-select merging method!")
sys.exit(1)
# Set merging method parameters
config.merging['method']['name'] = method['name']
explicit = False
for key in ['script', 'cmd', 'ext', 'cfg']:
if key in config.merging['method'] and config.merging['method'][key] is not None:
logger.warning("Explicit value for merge.%s overrides %s default", key, method['name'])
explicit = True
else:
config.merging['method'][key] = method.get(key, None)
if config.merging['method']['dependencies']:
logger.warning("Explicity adding merge.dependencies:\n %s",
"\n ".join(config.merging['method']['dependencies']))
explicit = True
config.merging['method']['dependencies'].extend(method.get('dependencies', []))
if config.merging['method']['metadata']:
logger.warning("Explicitly setting merge.metadata:\n %s",
config.merging['method']['metadata'])
explicit = True
config.merging['method']['metadata'].update(method.get('metadata', {}))
if explicit:
logger.warning("Consider specifying an explicity merging method instead of using 'auto'!")
[docs]
def set_method(method: dict) -> None:
"""
Set merging method parameters.
:param method: merging method dictionary
"""
logger.info("Using built-in merging method '%s'", method['name'])
for key in ['script', 'cmd', 'ext', 'cfg']:
if key in config.merging['method'] and config.merging['method'][key] is not None:
logger.info("Explicit value for merge.%s overrides %s default", key, method['name'])
else:
config.merging['method'][key] = method.get(key, None)
if config.merging['method']['dependencies']:
logger.info("Explicity adding merge.dependencies:\n %s",
"\n ".join(config.merging['method']['dependencies']))
config.merging['method']['dependencies'].extend(method.get('dependencies', []))
if config.merging['method']['metadata']:
logger.warning("Explicitly setting merge.metadata:\n %s",
config.merging['method']['metadata'])
config.merging['method']['metadata'].update(method.get('metadata', {}))
[docs]
def set_method_custom() -> None:
"""
Set merging method parameters for a custom script.
"""
name = config.merging['method']['name']
cmd = config.merging['method'].setdefault('cmd')
script = config.merging['method'].setdefault('script')
if not script and (not cmd or '{script}' in cmd):
# Assume the name is a script
config.merging['method']['script'] = name
config.merging['method']['name'] = os.path.basename(name)
logger.info("Using custom merging method: %s", name)
config.merging['method'].setdefault('ext', None)
config.merging['method'].setdefault('cfg', None)
config.merging['method'].setdefault('dependencies', [])
config.merging['method'].setdefault('metadata', {})
[docs]
def set_extension(files: dict) -> None:
"""
Get the file extension for the merged file.
:param files: set of files to merge
:return: file extension
"""
if config.merging['method']['ext']:
return
extensions = set()
for file in files:
extensions.add(os.path.splitext(file.name)[-1])
if len(extensions) != 1:
logger.critical("Cannot determine extension for merged files!")
sys.exit(1)
ext = extensions.pop()
config.merging['method']['ext'] = ext
logger.info("Auto-detected file extension '%s' from input files", ext)
[docs]
def check_method(files: dict) -> None:
"""
Check and set the merging method based on the input file metadata.
:param files: set of files to merge
"""
# Figure out merging method
name = config.merging['method']['name']
if name == 'auto':
set_method_auto(merged_keys(files, warn=False))
else:
# Check if we're using a built-in merging method
methods = [m for m in config.merging['methods'] if m['name'] == name]
if methods:
set_method(methods[-1])
else:
set_method_custom()
# Convert dependencies to a unique set of full paths
dependencies = set()
if config.merging['method']['script']:
dependencies.add(io_utils.find_runner(config.merging['method']['script']))
if config.merging['method']['cfg']:
dependencies.add(io_utils.find_cfg(config.merging['method']['cfg']))
for dep in config.merging['method']['dependencies']:
dependencies.add(io_utils.find_file(dep, ["config", "src"], recursive=True))
config.merging['method']['dependencies'] = list(dependencies)
# Move metadata overrides to the metadata configuration dictionary
config.metadata['overrides'].update(config.merging.pop('metadata', {}))
# Check for issues with the merging command
cmd = config.merging['method']['cmd']
if cmd:
if config.merging['method']['script'] and '{script}' not in cmd:
logger.warning("Merging command does not call provided '{script}'")
if config.merging['method']['cfg'] and '{cfg}' not in cmd:
logger.warning("Merging command does not use provided '{cfg}'")
if '{output}' not in cmd:
logger.critical("Merging command does not use required '{output}'")
sys.exit(1)
if '{inputs}' not in cmd:
logger.critical("Merging command does not use required '{inputs}'")
sys.exit(1)
# Figure out file extension if not provided
set_extension(files)
# Log final merging method configuration
msg = [f"Final settings for merging method '{config.merging['method']['name']}':"]
for key in ['cmd', 'script', 'cfg', 'ext']:
msg.append(f"{key}: {config.merging['method'][key]}")
msg.append("dependencies:")
msg.extend([f" {dep}" for dep in config.merging['method']['dependencies']])
logger.info("\n ".join(msg))
[docs]
def make_name(files: dict) -> str:
"""
Update merging method and create a name for the merged files.
:param files: set of files to merge
:return: merged file name
"""
check_method(files)
metadata = merged_keys(files, warn=True) # recalculate with correct method settings
name = MetaNameDict(metadata).format(config.output['name'])
return f"{name}_merged_{io_utils.get_timestamp()}"