services/ui_backend_service/data/db/tables/task.py (95 lines of code) (raw):
from .base import AsyncPostgresTable, HEARTBEAT_THRESHOLD, WAIT_TIME, OLD_RUN_FAILURE_CUTOFF_TIME
from .step import AsyncStepTablePostgres
from ..models import TaskRow
from services.data.db_utils import DBPagination, DBResponse, translate_run_key, translate_task_key
# use schema constants from the .data module to keep things consistent
from services.data.postgres_async_db import (
AsyncTaskTablePostgres as MetadataTaskTable,
AsyncArtifactTablePostgres as MetadataArtifactTable,
AsyncMetadataTablePostgres as MetaMetadataTable
)
from typing import List, Callable, Tuple
import json
import datetime
class AsyncTaskTablePostgres(AsyncPostgresTable):
_row_type = TaskRow
table_name = MetadataTaskTable.table_name
artifact_table = MetadataArtifactTable.table_name
metadata_table = MetaMetadataTable.table_name
keys = MetadataTaskTable.keys
primary_keys = MetadataTaskTable.primary_keys
trigger_keys = MetadataTaskTable.trigger_keys
trigger_operations = ["INSERT"]
# NOTE: There is a lot of unfortunate backwards compatibility logic for cases where task metadata,
# or artifacts have not been stored correctly.
# NOTE: OSS Schema has metadata value column as TEXT, but for the time being we also need to support
# value columns of type jsonb, which is why there is additional logic when dealing with 'value'
joins = [
"""
LEFT JOIN LATERAL (
SELECT
max(started_at) as started_at,
max(attempt_finished_at) as attempt_finished_at,
max(task_ok_finished_at) as task_ok_finished_at,
max(task_ok_location) as task_ok_location,
attempt_id :: int as attempt_id,
max(attempt_ok) :: boolean as attempt_ok,
task_id
FROM (
SELECT
task_id,
ts_epoch as started_at,
NULL::bigint as attempt_finished_at,
NULL::bigint as task_ok_finished_at,
NULL::text as task_ok_location,
NULL::text as attempt_ok,
(CASE
WHEN pg_typeof(value)='jsonb'::regtype
THEN value::jsonb->>0
ELSE value::text
END)::int as attempt_id
FROM {metadata_table} as meta
WHERE
{table_name}.flow_id = meta.flow_id AND
{table_name}.run_number = meta.run_number AND
{table_name}.step_name = meta.step_name AND
{table_name}.task_id = meta.task_id AND
meta.field_name = 'attempt'
UNION
SELECT
task_id,
NULL as started_at,
ts_epoch as attempt_finished_at,
NULL as task_ok_finished_at,
NULL as task_ok_location,
NULL as attempt_ok,
(CASE
WHEN pg_typeof(value)='jsonb'::regtype
THEN value::json->>0
ELSE value::text
END)::int as attempt_id
FROM {metadata_table} as meta
WHERE
{table_name}.flow_id = meta.flow_id AND
{table_name}.run_number = meta.run_number AND
{table_name}.step_name = meta.step_name AND
{table_name}.task_id = meta.task_id AND
meta.field_name = 'attempt-done'
UNION
SELECT
task_id,
NULL as started_at,
ts_epoch as attempt_finished_at,
NULL as task_ok_finished_at,
NULL as task_ok_location,
(CASE
WHEN pg_typeof(value)='jsonb'::regtype
THEN value::jsonb->>0
ELSE value::text
END) as attempt_ok,
(regexp_matches(tags::text, 'attempt_id:(\\d+)'))[1]::int as attempt_id
FROM {metadata_table} as meta
WHERE
{table_name}.flow_id = meta.flow_id AND
{table_name}.run_number = meta.run_number AND
{table_name}.step_name = meta.step_name AND
{table_name}.task_id = meta.task_id AND
meta.field_name = 'attempt_ok'
UNION
SELECT
task_id,
NULL as started_at,
NULL as attempt_finished_at,
ts_epoch as task_ok_finished_at,
location as task_ok_location,
NULL as attempt_ok,
attempt_id as attempt_id
FROM {artifact_table} as task_ok
WHERE
{table_name}.flow_id = task_ok.flow_id AND
{table_name}.run_number = task_ok.run_number AND
{table_name}.step_name = task_ok.step_name AND
{table_name}.task_id = task_ok.task_id AND
task_ok.name = '_task_ok'
) a
WHERE a.attempt_id IS NOT NULL
GROUP BY a.task_id, a.attempt_id
) as attempt ON true
LEFT JOIN LATERAL (
SELECT ts_epoch
FROM {metadata_table} as next_attempt_start
WHERE
{table_name}.flow_id = next_attempt_start.flow_id AND
{table_name}.run_number = next_attempt_start.run_number AND
{table_name}.step_name = next_attempt_start.step_name AND
{table_name}.task_id = next_attempt_start.task_id AND
next_attempt_start.field_name = 'attempt' AND
(attempt.attempt_id + 1) = (next_attempt_start.value::jsonb->>0)::int
LIMIT 1
) as next_attempt_start ON true
""".format(
table_name=table_name,
metadata_table=metadata_table,
artifact_table=artifact_table
),
]
@property
def select_columns(self):
# NOTE: We must use a function scope in order to be able to access the table_name variable for list comprehension.
return ["{table_name}.{col} AS {col}".format(table_name=self.table_name, col=k) for k in self.keys]
join_columns = [
"COALESCE(attempt.attempt_id, 0) as attempt_id",
"attempt.started_at as started_at",
"""
(CASE
WHEN {finished_at_column} IS NULL
AND {table_name}.last_heartbeat_ts IS NOT NULL
AND @(extract(epoch from now())-{table_name}.last_heartbeat_ts)>{heartbeat_threshold}
THEN {table_name}.last_heartbeat_ts*1000
ELSE {finished_at_column}
END) as finished_at
""".format(
table_name=table_name,
heartbeat_threshold=HEARTBEAT_THRESHOLD,
finished_at_column="COALESCE(GREATEST(attempt.attempt_finished_at, attempt.task_ok_finished_at), next_attempt_start.ts_epoch)"
),
"attempt.attempt_ok as attempt_ok",
# If 'attempt_ok' is present, we can leave task_ok NULL since
# that is used to fetch the artifact value from remote location.
# This process is performed at TaskRefiner (data_refiner.py)
"""
(CASE
WHEN attempt.attempt_ok IS NOT NULL
THEN NULL
ELSE attempt.task_ok_location
END) as task_ok
""",
"""
(CASE
WHEN attempt.attempt_ok IS TRUE
THEN 'completed'
WHEN attempt.attempt_ok IS FALSE
THEN 'failed'
WHEN COALESCE(attempt.attempt_finished_at, attempt.task_ok_finished_at) IS NOT NULL
AND attempt_ok IS NULL
THEN 'unknown'
WHEN COALESCE(attempt.attempt_finished_at, attempt.task_ok_finished_at) IS NOT NULL
THEN 'completed'
WHEN next_attempt_start.ts_epoch IS NOT NULL
THEN 'failed'
WHEN {table_name}.last_heartbeat_ts IS NOT NULL
AND @(extract(epoch from now())-{table_name}.last_heartbeat_ts)>{heartbeat_threshold}
AND {finished_at_column} IS NULL
THEN 'failed'
WHEN {table_name}.last_heartbeat_ts IS NULL
AND @(extract(epoch from now())*1000 - COALESCE(attempt.started_at, {table_name}.ts_epoch))>{cutoff}
AND {finished_at_column} IS NULL
THEN 'failed'
WHEN {table_name}.last_heartbeat_ts IS NULL
AND attempt IS NULL
THEN 'pending'
ELSE 'running'
END) AS status
""".format(
table_name=table_name,
heartbeat_threshold=HEARTBEAT_THRESHOLD,
finished_at_column="COALESCE(attempt.attempt_finished_at, attempt.task_ok_finished_at)",
cutoff=OLD_RUN_FAILURE_CUTOFF_TIME
),
"""
(CASE
WHEN {table_name}.last_heartbeat_ts IS NULL
AND @(extract(epoch from now())*1000 - COALESCE(attempt.started_at, {table_name}.ts_epoch))>{cutoff}
AND {finished_at_column} IS NULL
THEN NULL
WHEN {table_name}.last_heartbeat_ts IS NULL
AND attempt IS NULL
THEN NULL
ELSE
COALESCE(
GREATEST(attempt.attempt_finished_at, attempt.task_ok_finished_at),
next_attempt_start.ts_epoch,
{table_name}.last_heartbeat_ts*1000,
@(extract(epoch from now())::bigint*1000)
) - COALESCE(attempt.started_at, {table_name}.ts_epoch)
END) as duration
""".format(
table_name=table_name,
finished_at_column="COALESCE(attempt.attempt_finished_at, attempt.task_ok_finished_at)",
cutoff=OLD_RUN_FAILURE_CUTOFF_TIME
)
]
step_table_name = AsyncStepTablePostgres.table_name
async def get_task_attempt(self, flow_id: str, run_key: str,
step_name: str, task_key: str, attempt_id: int = None,
postprocess: Callable = None) -> DBResponse:
"""
Fetches task attempt from DB. Specifying attempt_id will fetch the specific attempt.
Otherwise the newest attempt is returned.
Parameters
----------
flow_id : str
Flow id of the task
run_key : str
Run number or run id of the task
step_name : str
Step name of the task
task_key : str
task id or task name
attempt_id : int (optional)
The specific attempt of the task to be fetched. If not provided, the latest attempt is returned.
postprocess : Callable
A callback function for refining results.
Receives DBResponse as an argument, and should return a DBResponse
Returns
-------
DBResponse
"""
run_id_key, run_id_value = translate_run_key(run_key)
task_id_key, task_id_value = translate_task_key(task_key)
conditions = [
"flow_id = %s",
"{run_id_column} = %s".format(run_id_column=run_id_key),
"step_name = %s",
"{task_id_column} = %s".format(task_id_column=task_id_key)
]
values = [flow_id, run_id_value, step_name, task_id_value]
if attempt_id:
conditions.append("attempt_id = %s")
values.append(attempt_id)
result, *_ = await self.find_records(
conditions=conditions,
values=values,
order=["attempt_id DESC"],
fetch_single=True,
enable_joins=True,
expanded=True,
postprocess=postprocess
)
return result
async def get_tasks_for_run(self, flow_id: str, run_key: str, postprocess: Callable = None) -> DBResponse:
"""
Fetches run tasks from DB.
Parameters
----------
flow_id : str
Flow id of the task
run_key : str
Run number or run id of the task
postprocess : Callable
A callback function for refining results.
Receives DBResponse as an argument, and should return a DBResponse
Returns
-------
DBResponse
"""
run_id_key, run_id_value = translate_run_key(run_key)
conditions = [
"flow_id = %s",
"{run_id_column} = %s".format(run_id_column=run_id_key)
]
values = [flow_id, run_id_value]
result, *_ = await self.find_records(
conditions=conditions,
values=values,
fetch_single=False,
enable_joins=True,
expanded=False,
postprocess=postprocess
)
return result