Source code for merge_utils.metacat_utils

"""Utility functions for interacting with the MetaCat web API."""

import logging
import itertools
import asyncio
import time
from typing import AsyncGenerator

import metacat.webapi as metacat

from merge_utils import config
from merge_utils.retriever import FileRetriever

logger = logging.getLogger(__name__)

[docs] class MetaCatRetriever(FileRetriever): """Class for managing asynchronous queries to the MetaCat web API.""" def __init__(self, query: str = None, filelist: list = None): """ Initialize the MetaCatRetriever with a query or a list of files. :param query: MQL query to find files :param filelist: list of file DIDs to find """ super().__init__() self.query = query self.filelist = filelist self.parents = config.output['grandparents'] if query and filelist: logger.warning("Both query and file list provided, was this intended?") if not self.filelist: self.filelist = [] self.client = None
[docs] async def connect(self) -> None: """Connect to the MetaCat web API""" if not self.client: logger.debug("Connecting to MetaCat") self.client = await asyncio.to_thread(metacat.MetaCatClient) else: logger.debug("Already connected to MetaCat")
async def _get_files(self, idx: int, dids: list) -> list: """ Asynchronously request file data from MetaCat :param idx: batch number (for logging) :param dids: list of DIDs to request :return: list of results from the request """ if len(dids) == 0: return [] logger.info("Retrieving files from MetaCat for batch %d", idx) dictlist = [{'did':did} for did in dids] logger.debug("Requesting dids:\n %s", "\n ".join([f['did'] for f in dictlist])) try: res = await asyncio.to_thread(self.client.get_files, dictlist, with_metadata = True, with_provenance = self.parents) except (ValueError, metacat.webapi.BadRequestError) as err: logger.critical("%s", err) raise ValueError(f"Failed to retrieve files from MetaCat for batch {idx}: {err}") from err return list(res) async def _get_query(self, idx: int) -> list: """ Asynchronously query MetaCat :param idx: batch number to query :return: list of results from the query """ if not self.query: return [] logger.info("Querying MetaCat for batch %d", idx) query_batch = self.query + f" skip {idx*self.step} limit {self.step}" logger.debug("Query: %s", query_batch) try: # async_query exists but does not seem to be compatible with asyncIO res = await asyncio.to_thread(self.client.query, query_batch, with_metadata = True, with_provenance = self.parents) except metacat.webapi.BadRequestError as err: logger.critical("Malformed MetaCat query:\n %s\n%s", self.query, err) raise ValueError(f"Failed to query MetaCat for batch {idx}: {err}") from err return list(res)
[docs] async def input_batches(self) -> AsyncGenerator[dict, None]: """ Asynchronously retrieve metadata for the next batch of files. :return: dict of MergeFile objects that were added """ # request first batch from filelist dids = self.filelist[0:self.step] task = asyncio.create_task(self._get_files(0, dids)) # loop over batches from filelist for idx in range(1, len(self.filelist)//self.step + 1): old_dids = dids dids = self.filelist[idx*self.step:(idx+1)*self.step] res = await task task = asyncio.create_task(self._get_files(idx, dids)) added = await self.add(res, old_dids) logger.debug("yielding file batch %d", idx-1) yield added res = await task # request first batch from query task = asyncio.create_task(self._get_query(0)) # finish processing last batch from filelist if dids: added = await self.add(res, dids) logger.debug("yielding last file batch") yield added # loop over batches from query for idx in itertools.count(1): res = await task if len(res) < self.step: break task = asyncio.create_task(self._get_query(idx)) added = await self.add(res) logger.debug("yielding query batch %d", idx-1) yield added # yield last partial batch from query if res: added = await self.add(res) logger.debug("yielding last query batch") yield added
[docs] def list_field_values(field: str) -> list: """ Get a list of all values for a given field in the MetaCat database. :param field: field to query :return: list of values for the field """ client = metacat.MetaCatClient() query = f"files where {field} present limit 1" values = [] while True: res = client.query(query, with_metadata=True) data = next(res, None) if not data: break value = data['metadata'][field] print(value) values.append(value) query = f"""files where {field} present and {field} not in ('{"','".join(values)}') limit 1""" #time.sleep(1) return values
[docs] def list_extensions() -> list: """ Get a list of all file extensions in the MetaCat database. :return: list of file extensions """ client = metacat.MetaCatClient() query = "files where name ~ '\\.[a-z]' limit 1" and_name = "' and name !~ '\\." values = [] while True: res = client.query(query, with_metadata=False) data = next(res, None) if not data: break ext = data['name'].split('.')[-1] print(data['namespace']+":"+data['name']) values.append(ext) query = f"files where name ~ '\\.[a-z]{and_name}{and_name.join(values)}' limit 1" return values