services/ui_backend_service/data/db/tables/step.py (61 lines of code) (raw):
from typing import List, Tuple
from .base import AsyncPostgresTable, OLD_RUN_FAILURE_CUTOFF_TIME
from ..models import StepRow
from services.data.db_utils import DBResponse, DBPagination
# use schema constants from the .data module to keep things consistent
from services.data.postgres_async_db import (
AsyncRunTablePostgres as MetadataRunTable,
AsyncStepTablePostgres as MetadataStepTable,
AsyncTaskTablePostgres as MetadataTaskTable,
AsyncArtifactTablePostgres as MetadataArtifactTable,
AsyncMetadataTablePostgres as MetaMetadataTable
)
class AsyncStepTablePostgres(AsyncPostgresTable):
step_dict = {}
run_to_step_dict = {}
_row_type = StepRow
table_name = MetadataStepTable.table_name
keys = MetadataStepTable.keys
primary_keys = MetadataStepTable.primary_keys
trigger_keys = MetadataStepTable.trigger_keys
run_table_name = MetadataRunTable.table_name
task_table_name = MetadataTaskTable.table_name
artifact_table_name = MetadataArtifactTable.table_name
metadata_table_name = MetaMetadataTable.table_name
joins = [
"""
LEFT JOIN LATERAL (
SELECT last_heartbeat_ts as heartbeat_ts
FROM {task_table}
WHERE {table_name}.flow_id={task_table}.flow_id
AND {table_name}.run_number={task_table}.run_number
AND {table_name}.step_name={task_table}.step_name
ORDER BY last_heartbeat_ts DESC
LIMIT 1
) AS latest_task_hb ON true
""".format(
table_name=table_name,
task_table=task_table_name
),
"""
LEFT JOIN LATERAL (
SELECT ts_epoch as ts_epoch
FROM {artifact_table}
WHERE {table_name}.flow_id={artifact_table}.flow_id
AND {table_name}.run_number={artifact_table}.run_number
AND {table_name}.step_name={artifact_table}.step_name
AND {artifact_table}.name = '_task_ok'
ORDER BY
ts_epoch DESC
LIMIT 1
) AS latest_task_ok ON true
""".format(
table_name=table_name,
artifact_table=artifact_table_name
),
"""
LEFT JOIN LATERAL (
SELECT ts_epoch as ts_epoch
FROM {metadata_table}
WHERE {table_name}.flow_id={metadata_table}.flow_id
AND {table_name}.run_number={metadata_table}.run_number
AND {table_name}.step_name={metadata_table}.step_name
AND (
{metadata_table}.field_name = 'attempt_ok' OR
{metadata_table}.field_name = 'attempt-done'
)
ORDER BY
ts_epoch DESC
LIMIT 1
) AS latest_metadata_done ON true
""".format(
table_name=table_name,
metadata_table=metadata_table_name
)
]
@property
def select_columns(self):
# NOTE: We must use a function scope in order to be able to access the table_name variable for list comprehension.
return ["{table_name}.{col} AS {col}".format(table_name=self.table_name, col=k) for k in self.keys]
join_columns = [
"""
(CASE
WHEN COALESCE(latest_task_ok, latest_metadata_done, latest_task_hb) IS NOT NULL
THEN GREATEST(
latest_task_ok.ts_epoch,
latest_metadata_done.ts_epoch,
latest_task_hb.heartbeat_ts*1000
) - {table_name}.ts_epoch
WHEN @(extract(epoch from now())::bigint*1000) - {table_name}.ts_epoch > {cutoff}
THEN NULL
ELSE @(extract(epoch from now())::bigint*1000) - {table_name}.ts_epoch
END) as duration
""".format(
table_name=table_name,
cutoff=OLD_RUN_FAILURE_CUTOFF_TIME
)
]
async def get_step_names(self, conditions: List[str] = [],
values: List[str] = [], limit: int = 0, offset: int = 0) -> Tuple[DBResponse, DBPagination]:
"""
Get a paginated set of step names.
Parameters
----------
conditions : List[str]
list of conditions to pass the sql execute, with %s placeholders for values
values : List[str]
list of values to be passed for the sql execute.
limit : int (optional) (default 0)
limit for the number of results
offset : int (optional) (default 0)
offset for the results.
Returns
-------
(DBResponse, DBPagination)
"""
sql_template = """
SELECT step_name FROM (
SELECT DISTINCT step_name, flow_id, run_number, run_id
FROM {table_name}
) T
{conditions}
{limit}
{offset}
"""
select_sql = sql_template.format(
table_name=self.table_name,
keys=",".join(self.select_columns),
conditions=("WHERE {}".format(" AND ".join(conditions)) if conditions else ""),
limit="LIMIT {}".format(limit) if limit else "",
offset="OFFSET {}".format(offset) if offset else ""
)
res, pag = await self.execute_sql(select_sql=select_sql, values=values, fetch_single=False,
expanded=False,
limit=limit, offset=offset, serialize=False)
# process the unserialized DBResponse
_body = [row[0] for row in res.body]
return DBResponse(res.response_code, _body), pag