services/ui_backend_service/data/db/tables/artifact.py (72 lines of code) (raw):
from typing import List, Tuple
from .base import AsyncPostgresTable
from .task import AsyncTaskTablePostgres
from ..models import ArtifactRow
# use schema constants from the .data module to keep things consistent
from services.data.postgres_async_db import AsyncArtifactTablePostgres as MetadataArtifactTable
from services.data.db_utils import translate_run_key, DBResponse, DBPagination
class AsyncArtifactTablePostgres(AsyncPostgresTable):
_row_type = ArtifactRow
table_name = MetadataArtifactTable.table_name
task_table_name = AsyncTaskTablePostgres.table_name
ordering = ["attempt_id DESC"]
keys = MetadataArtifactTable.keys
primary_keys = MetadataArtifactTable.primary_keys
trigger_keys = None
trigger_operations = None
select_columns = keys
async def get_run_parameter_artifacts(self, flow_name, run_number, postprocess=None, invalidate_cache=False):
run_id_key, run_id_value = translate_run_key(run_number)
# '_parameters' step has all the parameters as artifacts. only pick the
# public parameters (no underscore prefix)
return await self.find_records(
conditions=[
"flow_id = %s",
"{run_id_key} = %s".format(run_id_key=run_id_key),
"step_name = %s",
"name NOT LIKE %s",
"name <> %s",
"name <> %s"
],
values=[
flow_name,
run_id_value,
"_parameters",
r"\_%",
"name", # exclude the 'name' parameter as this always exists, and contains the FlowName
"script_name" # exclude the internally used 'script_name' parameter.
],
fetch_single=False,
expanded=True,
postprocess=postprocess,
invalidate_cache=invalidate_cache
)
async def get_artifact_names(self, conditions: List[str] = [],
values: List[str] = [], limit: int = 0, offset: int = 0) -> Tuple[DBResponse, DBPagination]:
"""
Get a paginated set of artifact names.
Parameters
----------
conditions : List[str]
list of conditions to pass the sql execute, with %s placeholders for values
values : List[str]
list of values to be passed for the sql execute.
limit : int (optional) (default 0)
limit for the number of results
offset : int (optional) (default 0)
offset for the results.
Returns
-------
(DBResponse, DBPagination)
"""
sql_template = """
SELECT name FROM (
SELECT DISTINCT name, flow_id, run_number, run_id
FROM {table_name}
) T
{conditions}
{limit}
{offset}
"""
select_sql = sql_template.format(
table_name=self.table_name,
keys=",".join(self.select_columns),
conditions=("WHERE {}".format(" AND ".join(conditions)) if conditions else ""),
limit="LIMIT {}".format(limit) if limit else "",
offset="OFFSET {}".format(offset) if offset else ""
)
res, pag = await self.execute_sql(select_sql=select_sql, values=values, fetch_single=False,
expanded=False,
limit=limit, offset=offset, serialize=False)
# process the unserialized DBResponse
_body = [row[0] for row in res.body]
return DBResponse(res.response_code, _body), pag
async def get_run_graph_info_artifact(self, flow_name: str, run_id: str) -> DBResponse:
"""
Tries to locate '_graph_info' in run artifacts
"""
run_id_key, run_id_value = translate_run_key(run_id)
db_response, *_ = await self.find_records(
conditions=[
"flow_id = %s",
"{run_id_key} = %s".format(
run_id_key=run_id_key),
"step_name = %s",
"name = %s"
],
values=[
flow_name, run_id_value, "_parameters",
"_graph_info",
],
fetch_single=True, expanded=True
)
return db_response