services/ui_backend_service/data/db/tables/run.py (127 lines of code) (raw):

import os import time from typing import List, Tuple from .base import ( AsyncPostgresTable, OLD_RUN_FAILURE_CUTOFF_TIME, RUN_INACTIVE_CUTOFF_TIME, ) from ..models import RunRow from services.data.db_utils import DBResponse, DBPagination, translate_run_key # use schema constants from the .data module to keep things consistent from services.data.postgres_async_db import ( AsyncRunTablePostgres as MetadataRunTable, AsyncMetadataTablePostgres as MetaMetadataTable, AsyncArtifactTablePostgres as MetadataArtifactTable, AsyncTaskTablePostgres as MetadataTaskTable, ) # Prefetch runs since 2 days ago (in seconds), limit maximum of 50 runs METAFLOW_ARTIFACT_PREFETCH_RUNS_SINCE = os.environ.get("PREFETCH_RUNS_SINCE", 86400 * 2) METAFLOW_ARTIFACT_PREFETCH_RUNS_LIMIT = os.environ.get("PREFETCH_RUNS_LIMIT", 50) class AsyncRunTablePostgres(AsyncPostgresTable): _row_type = RunRow table_name = MetadataRunTable.table_name metadata_table = MetaMetadataTable.table_name artifact_table = MetadataArtifactTable.table_name task_table = MetadataTaskTable.table_name keys = MetadataRunTable.keys primary_keys = MetadataRunTable.primary_keys trigger_keys = MetadataRunTable.trigger_keys trigger_operations = ["INSERT"] # NOTE: OSS Schema has metadata value column as TEXT, but for the time being we also need to support # value columns of type jsonb, which is why there is additional logic when dealing with 'value' joins = [ """ LEFT JOIN LATERAL ( SELECT ts_epoch, (CASE WHEN pg_typeof(value)='jsonb'::regtype THEN value::jsonb->>0 ELSE value::text END)::boolean as value FROM {metadata_table} as attempt_ok WHERE {table_name}.flow_id = attempt_ok.flow_id AND {table_name}.run_number = attempt_ok.run_number AND attempt_ok.step_name = 'end' AND attempt_ok.field_name = 'attempt_ok' ORDER BY ts_epoch DESC LIMIT 1 ) as end_attempt_ok ON true """.format( table_name=table_name, metadata_table=metadata_table ), """ LEFT JOIN LATERAL ( SELECT ts_epoch FROM {metadata_table} as attempt WHERE {table_name}.flow_id = attempt.flow_id AND {table_name}.run_number = attempt.run_number AND attempt.step_name = 'end' AND attempt.field_name = 'attempt' AND end_attempt_ok.value IS FALSE ORDER BY ts_epoch DESC LIMIT 1 ) as end_attempt ON true """.format( table_name=table_name, metadata_table=metadata_table ), ] @property def select_columns(self): # NOTE: We must use a function scope in order to be able to access the table_name variable for list comprehension. # User should be considered NULL when 'user:*' tag is missing # This is usually the case with AWS Step Functions return ( [ "{table_name}.{col} AS {col}".format(table_name=self.table_name, col=k) for k in self.keys ] + [ """ (CASE WHEN system_tags ? ('user:' || user_name) THEN user_name ELSE NULL END) AS user""" ] + [ """ COALESCE({table_name}.run_id, {table_name}.run_number::text) AS run """.format( table_name=self.table_name ) ] ) join_columns = [ """ (CASE WHEN end_attempt IS NOT NULL AND end_attempt_ok.ts_epoch < end_attempt.ts_epoch THEN NULL WHEN end_attempt_ok IS NOT NULL THEN end_attempt_ok.ts_epoch WHEN {table_name}.last_heartbeat_ts IS NOT NULL AND @(extract(epoch from now())-{table_name}.last_heartbeat_ts)<={heartbeat_cutoff} THEN NULL ELSE {table_name}.last_heartbeat_ts*1000 END) AS finished_at """.format( table_name=table_name, heartbeat_cutoff=RUN_INACTIVE_CUTOFF_TIME, ), """ (CASE WHEN end_attempt IS NOT NULL AND end_attempt_ok.ts_epoch < end_attempt.ts_epoch THEN 'running' WHEN end_attempt_ok IS NOT NULL AND end_attempt_ok.value IS TRUE THEN 'completed' WHEN end_attempt_ok IS NOT NULL AND end_attempt_ok.value IS FALSE THEN 'failed' WHEN {table_name}.last_heartbeat_ts IS NOT NULL AND @(extract(epoch from now())-{table_name}.last_heartbeat_ts)<={heartbeat_cutoff} THEN 'running' ELSE 'failed' END) AS status """.format( table_name=table_name, heartbeat_cutoff=RUN_INACTIVE_CUTOFF_TIME, ), """ (CASE WHEN end_attempt IS NOT NULL AND end_attempt_ok.ts_epoch < end_attempt.ts_epoch AND {table_name}.last_heartbeat_ts IS NOT NULL THEN {table_name}.last_heartbeat_ts*1000-{table_name}.ts_epoch WHEN end_attempt_ok IS NOT NULL THEN end_attempt_ok.ts_epoch - {table_name}.ts_epoch WHEN {table_name}.last_heartbeat_ts IS NOT NULL THEN {table_name}.last_heartbeat_ts*1000-{table_name}.ts_epoch WHEN {table_name}.last_heartbeat_ts IS NULL AND @(extract(epoch from now())::bigint*1000-{table_name}.ts_epoch)>{cutoff} THEN NULL ELSE @(extract(epoch from now())::bigint*1000-{table_name}.ts_epoch) END) AS duration """.format( table_name=table_name, cutoff=OLD_RUN_FAILURE_CUTOFF_TIME, ), ] async def get_recent_runs(self): _records, *_ = await self.find_records( conditions=["ts_epoch >= %s"], values=[ int(round(time.time() * 1000)) - (int(METAFLOW_ARTIFACT_PREFETCH_RUNS_SINCE) * 1000) ], order=["ts_epoch DESC"], limit=METAFLOW_ARTIFACT_PREFETCH_RUNS_LIMIT, expanded=True, ) return _records.body async def get_run(self, flow_id: str, run_key: str): """ Fetch run with a given flow_id and run id or number from the DB. Parameters ---------- flow_id : str flow_id run_key : str run number or run id Returns ------- DBResponse Containing a single run record, if one was found. """ run_id_key, run_id_value = translate_run_key(run_key) result, *_ = await self.find_records( conditions=[ "flow_id = %s", "{run_id_key} = %s".format(run_id_key=run_id_key), ], values=[flow_id, run_id_value], fetch_single=True, enable_joins=True, ) return result async def get_expanded_run(self, run_key: str) -> DBResponse: """ Fetch run with a given id or number from the DB. Parameters ---------- run_key : str run number or run id Returns ------- DBResponse Containing a single run record, if one was found. """ run_id_key, run_id_value = translate_run_key(run_key) result, *_ = await self.find_records( conditions=["{column} = %s".format(column=run_id_key)], values=[run_id_value], fetch_single=True, enable_joins=True, expanded=True, ) return result async def get_run_keys( self, conditions: List[str] = [], values: List[str] = [], limit: int = 0, offset: int = 0, ) -> Tuple[DBResponse, DBPagination]: """ Get a paginated set of run keys. Parameters ---------- conditions : List[str] list of conditions to pass the sql execute, with %s placeholders for values values : List[str] list of values to be passed for the sql execute. limit : int (optional) (default 0) limit for the number of results offset : int (optional) (default 0) offset for the results. Returns ------- (DBResponse, DBPagination) """ sql_template = """ SELECT run FROM ( SELECT DISTINCT COALESCE(run_id, run_number::text) as run, flow_id FROM {table_name} ) T {conditions} {limit} {offset} """ select_sql = sql_template.format( table_name=self.table_name, keys=",".join(self.select_columns), conditions=( "WHERE {}".format(" AND ".join(conditions)) if conditions else "" ), limit="LIMIT {}".format(limit) if limit else "", offset="OFFSET {}".format(offset) if offset else "", ) res, pag = await self.execute_sql( select_sql=select_sql, values=values, fetch_single=False, expanded=False, limit=limit, offset=offset, serialize=False, ) # process the unserialized DBResponse _body = [row[0] for row in res.body] return DBResponse(res.response_code, _body), pag