services/ui_backend_service/api/dag.py (40 lines of code) (raw):
from services.data.db_utils import DBResponse, translate_run_key
from services.utils import handle_exceptions
from .utils import format_response, web_response, query_param_enabled
from services.ui_backend_service.data.db.utils import get_run_dag_data
class DagApi(object):
def __init__(self, app, db, cache=None):
self.db = db
app.router.add_route(
"GET", "/flows/{flow_id}/runs/{run_number}/dag", self.get_run_dag
)
self._dag_store = getattr(cache, "dag_cache", None)
@handle_exceptions
async def get_run_dag(self, request):
"""
---
description: Get DAG structure for a run.
tags:
- Run
parameters:
- $ref: '#/definitions/Params/Path/flow_id'
- $ref: '#/definitions/Params/Path/run_number'
- $ref: '#/definitions/Params/Custom/invalidate'
produces:
- application/json
responses:
"200":
description: Return DAG structure for a specific run
schema:
$ref: '#/definitions/ResponsesDag'
"405":
description: invalid HTTP Method
schema:
$ref: '#/definitions/ResponsesError405'
"404":
description: necessary data for DAG generation Not Found
schema:
$ref: '#/definitions/ResponsesError404'
"500":
description: Internal Server Error (with error id)
schema:
$ref: '#/definitions/ResponsesDagError500'
"""
flow_name = request.match_info['flow_id']
run_number = request.match_info.get("run_number")
# Before running the cache action, we make sure that the run has
# the necessary data to generate a DAG.
db_response = await get_run_dag_data(self.db, flow_name, run_number)
if not db_response.response_code == 200:
# DAG data was not found, return with the corresponding status.
status, body = format_response(request, db_response)
return web_response(status, body)
# Prefer run_id over run_number
flow_name = db_response.body['flow_id']
run_id = db_response.body.get('run_id') or db_response.body['run_number']
invalidate_cache = query_param_enabled(request, "invalidate")
dag = await self._dag_store.cache.GenerateDag(
flow_name, run_id, invalidate_cache=invalidate_cache)
if dag.has_pending_request():
async for event in dag.stream():
if event["type"] == "error":
# raise error, there was an exception during processing.
raise GenerateDAGFailed(event["message"], event["id"], event["traceback"])
await dag.wait() # wait until results are ready
dag = dag.get()
response = DBResponse(200, dag)
status, body = format_response(request, response)
return web_response(status, body)
class GenerateDAGFailed(Exception):
def __init__(self, msg="Failed to process DAG", id="failed-to-process-dag", traceback_str=None):
self.message = msg
self.id = id
self.traceback_str = traceback_str
def __str__(self):
return self.message