services/ui_backend_service/api/run.py (90 lines of code) (raw):
from ..data.refiner.parameter_refiner import GetParametersFailed
from services.data.db_utils import DBResponse, translate_run_key
from services.utils import handle_exceptions
from .utils import find_records, web_response, format_response, \
builtin_conditions_query, pagination_query, query_param_enabled
class RunApi(object):
def __init__(self, app, db, cache=None):
self.db = db
app.router.add_route(
"GET", "/runs", self.get_all_runs)
app.router.add_route(
"GET", "/flows/{flow_id}/runs", self.get_flow_runs)
app.router.add_route(
"GET", "/flows/{flow_id}/runs/{run_number}", self.get_run)
app.router.add_route(
"GET", "/flows/{flow_id}/runs/{run_number}/parameters", self.get_run_parameters)
self._async_table = self.db.run_table_postgres
self._artifact_table = self.db.artifact_table_postgres
self._artifact_store = getattr(cache, "artifact_cache", None)
@handle_exceptions
async def get_run(self, request):
"""
---
description: Get one run
tags:
- Run
parameters:
- $ref: '#/definitions/Params/Path/flow_id'
- $ref: '#/definitions/Params/Path/run_number'
produces:
- application/json
responses:
"200":
description: Returns one run
schema:
$ref: '#/definitions/ResponsesRun'
"405":
description: invalid HTTP Method
schema:
$ref: '#/definitions/ResponsesError405'
"""
flow_name = request.match_info.get("flow_id")
run_id_key, run_id_value = translate_run_key(
request.match_info.get("run_number"))
return await find_records(request,
self._async_table,
fetch_single=True,
initial_conditions=[
"flow_id = %s",
"{run_id_key} = %s".format(
run_id_key=run_id_key),
],
initial_values=[flow_name, run_id_value],
enable_joins=True)
@handle_exceptions
async def get_all_runs(self, request):
"""
---
description: Get all runs
tags:
- Run
parameters:
- $ref: '#/definitions/Params/Builtin/_page'
- $ref: '#/definitions/Params/Builtin/_limit'
- $ref: '#/definitions/Params/Builtin/_order'
- $ref: '#/definitions/Params/Builtin/_tags'
- $ref: '#/definitions/Params/Builtin/_group'
- $ref: '#/definitions/Params/Custom/run_number'
- $ref: '#/definitions/Params/Custom/run_id'
- $ref: '#/definitions/Params/Custom/user_name'
- $ref: '#/definitions/Params/Custom/ts_epoch'
- $ref: '#/definitions/Params/Custom/finished_at'
- $ref: '#/definitions/Params/Custom/duration'
- $ref: '#/definitions/Params/Custom/status'
produces:
- application/json
responses:
"200":
description: Returns all runs
schema:
$ref: '#/definitions/ResponsesRunList'
"405":
description: invalid HTTP Method
schema:
$ref: '#/definitions/ResponsesError405'
"""
allowed_order = self._async_table.keys + ["user", "run", "finished_at", "duration", "status"]
allowed_group = self._async_table.keys + ["user"]
allowed_filters = self._async_table.keys + ["user", "run", "finished_at", "duration", "status"]
# JSONB tag filters combined with `ORDER BY` causes performance impact
# due to lack of pg statistics on JSONB fields. To battle this, first execute
# subquery of ordered list from runs_v3 table in and filter by tags on outer query.
# This needs more research in the future to further improve performance.
builtin_conditions, _ = builtin_conditions_query(request)
has_tag_filter = len([s for s in builtin_conditions if 'tags||system_tags' in s]) > 0
if has_tag_filter:
allowed_optimized_order = ['flow_id', 'ts_epoch']
allowed_unoptimized_order = [o for o in allowed_group if o not in allowed_optimized_order]
_, _, _, optimized_order, _, _ = pagination_query(request, allowed_order=allowed_optimized_order)
_, _, _, unoptimized_order, _, _ = pagination_query(request, allowed_order=allowed_unoptimized_order)
# Allow optimized order only when sorting by real columns only
if optimized_order and not unoptimized_order:
overwrite_select_from = "(SELECT * FROM {table_name} {order_by}) AS {table_name}".format(
order_by="ORDER BY {}".format(", ".join(optimized_order)),
table_name=self._async_table.table_name
)
return await find_records(request, self._async_table,
allowed_group=allowed_group,
allowed_filters=allowed_filters,
enable_joins=True,
overwrite_select_from=overwrite_select_from
)
return await find_records(request, self._async_table,
allowed_order=allowed_order,
allowed_group=allowed_group,
allowed_filters=allowed_filters,
enable_joins=True
)
@handle_exceptions
async def get_flow_runs(self, request):
"""
---
description: Get all runs of specified flow
tags:
- Run
parameters:
- $ref: '#/definitions/Params/Path/flow_id'
- $ref: '#/definitions/Params/Builtin/_page'
- $ref: '#/definitions/Params/Builtin/_limit'
- $ref: '#/definitions/Params/Builtin/_order'
- $ref: '#/definitions/Params/Builtin/_tags'
- $ref: '#/definitions/Params/Builtin/_group'
- $ref: '#/definitions/Params/Custom/run_number'
- $ref: '#/definitions/Params/Custom/run_id'
- $ref: '#/definitions/Params/Custom/user_name'
- $ref: '#/definitions/Params/Custom/ts_epoch'
- $ref: '#/definitions/Params/Custom/finished_at'
- $ref: '#/definitions/Params/Custom/duration'
- $ref: '#/definitions/Params/Custom/status'
produces:
- application/json
responses:
"200":
description: Returns all runs of specified flow
schema:
$ref: '#/definitions/ResponsesRunList'
"405":
description: invalid HTTP Method
schema:
$ref: '#/definitions/ResponsesError405'
"""
flow_name = request.match_info.get("flow_id")
return await find_records(request,
self._async_table,
initial_conditions=["flow_id = %s"],
initial_values=[flow_name],
allowed_order=self._async_table.keys + ["user", "run", "finished_at", "duration", "status"],
allowed_group=self._async_table.keys + ["user"],
allowed_filters=self._async_table.keys + ["user", "run", "finished_at", "duration", "status"],
enable_joins=True
)
@handle_exceptions
async def get_run_parameters(self, request):
"""
---
description: Get parameters of a run
tags:
- Run
parameters:
- $ref: '#/definitions/Params/Path/flow_id'
- $ref: '#/definitions/Params/Path/run_number'
- $ref: '#/definitions/Params/Custom/invalidate'
produces:
- application/json
responses:
"200":
description: Returns parameters of a run
schema:
$ref: '#/definitions/ResponsesRunParameters'
"405":
description: invalid HTTP Method
schema:
$ref: '#/definitions/ResponsesError405'
"500":
description: Internal Server Error (with error id)
schema:
$ref: '#/definitions/ResponsesRunParametersError500'
"""
flow_name = request.match_info['flow_id']
run_number = request.match_info.get("run_number")
invalidate_cache = query_param_enabled(request, "invalidate")
# _artifact_store.get_run_parameters will translate run_number/run_id properly
combined_results = await self._artifact_store.get_run_parameters(
flow_name, run_number, invalidate_cache=invalidate_cache)
postprocess_error = combined_results.get("postprocess_error", None)
if postprocess_error:
raise GetParametersFailed(
postprocess_error["detail"], postprocess_error["id"], postprocess_error["traceback"])
else:
response = DBResponse(200, combined_results)
status, body = format_response(request, response)
return web_response(status, body)