services/ui_backend_service/data/refiner/parameter_refiner.py (27 lines of code) (raw):

from .refinery import Refinery class ParameterRefiner(Refinery): """ Refiner class for postprocessing Run parameters. Uses Metaflow Client API to refine Run parameters from Metaflow Datastore. Parameters ----------- cache : AsyncCacheClient An instance of a cache that implements the GetParameters action. """ def __init__(self, cache): super().__init__(cache=cache) def _action(self): return self.cache_store.cache.GetParameters async def fetch_data(self, targets, event_stream=None, invalidate_cache=False): _res = await self._action()(targets, invalidate_cache=invalidate_cache) if _res.has_pending_request(): async for event in _res.stream(): if event["type"] == "error": # raise error, there was an exception during processing. raise GetParametersFailed(event["message"], event["id"], event["traceback"]) await _res.wait() # wait for results to be ready return _res.get() or {} # cache get() might return None if no keys are produced. def _record_to_action_input(self, record): # Prefer run_id over run_number return "{flow_id}/{run_id}".format( flow_id=record['flow_id'], run_id=record.get('run_id') or record['run_number']) async def refine_record(self, record, values): return {k: {'value': v} for k, v in values.items()} class GetParametersFailed(Exception): def __init__(self, msg="Failed to Get Parameters", id="failed-to-get-parameters", traceback_str=None): self.message = msg self.id = id self.traceback_str = traceback_str def __str__(self): return self.message