services/ui_backend_service/data/refiner/task_refiner.py (25 lines of code) (raw):

from .refinery import Refinery class TaskRefiner(Refinery): """ Refiner class for postprocessing Task rows. Uses Metaflow Client API to refine Task's actual status from Metaflow Service and Datastore. Parameters ----------- cache : AsyncCacheClient An instance of a cache that implements the GetTask action. """ def __init__(self, cache): super().__init__(cache=cache) def _action(self): return self.cache_store.cache.GetTask def _record_to_action_input(self, record): # Prefer run_id over run_number # Prefer task_name over task_id return "{flow_id}/{run_id}/{step_name}/{task_name}/{attempt_id}".format( flow_id=record['flow_id'], run_id=record.get('run_id') or record['run_number'], step_name=record['step_name'], task_name=record.get('task_name') or record['task_id'], attempt_id=record['attempt_id']) async def refine_record(self, record, values): if record['status'] == 'unknown' and values.get('_task_ok') is not None: value = values['_task_ok'] if value is False: record['status'] = 'failed' elif value is True: record['status'] = 'completed' if values.get('_foreach_stack'): value = values['_foreach_stack'] if len(value) > 0 and len(value[0]) >= 4: # The third one in the tuple is the foreach index. We access this way for backwards compatibility. record['foreach_label'] = "{}[{}]".format(record['task_id'], value[0][3]) return record