services/ui_backend_service/data/db/utils.py (7 lines of code) (raw):

from services.data.db_utils import DBResponse from services.ui_backend_service.data.db.postgres_async_db import AsyncPostgresDB async def get_run_dag_data(db: AsyncPostgresDB, flow_name: str, run_number: str) -> DBResponse: """ Fetches either a _graph_info artifact, or a code-package metadata entry if the artifact is missing. Used to determine whether a run can display a DAG. """ db_response = await db.artifact_table_postgres.get_run_graph_info_artifact(flow_name, run_number) if not db_response.response_code == 200: # Try to look for codepackage if graph artifact is missing db_response = await db.metadata_table_postgres.get_run_codepackage_metadata(flow_name, run_number) return db_response