services/ui_backend_service/api/artifact.py (99 lines of code) (raw):
from services.data.db_utils import translate_run_key, translate_task_key
from services.utils import handle_exceptions
from .utils import find_records, postprocess_chain, apply_run_tags_postprocess
from ..data.refiner import ArtifactRefiner
class ArtificatsApi(object):
def __init__(self, app, db, cache=None):
self.db = db
self.refiner = ArtifactRefiner(cache=cache.artifact_cache) if cache else None
self._async_table = self.db.artifact_table_postgres
self._async_run_table = self.db.run_table_postgres
app.router.add_route(
"GET",
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}/artifacts",
self.get_artifacts_by_task,
)
app.router.add_route(
"GET",
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/artifacts",
self.get_artifacts_by_step,
)
app.router.add_route(
"GET",
"/flows/{flow_id}/runs/{run_number}/artifacts",
self.get_artifacts_by_run,
)
@handle_exceptions
async def get_artifacts_by_task(self, request):
"""
---
description: Get all artifacts of specified task
tags:
- Artifact
parameters:
- $ref: '#/definitions/Params/Path/flow_id'
- $ref: '#/definitions/Params/Path/run_number'
- $ref: '#/definitions/Params/Path/step_name'
- $ref: '#/definitions/Params/Path/task_id'
- $ref: '#/definitions/Params/Builtin/_page'
- $ref: '#/definitions/Params/Builtin/_limit'
- $ref: '#/definitions/Params/Builtin/_order'
- $ref: '#/definitions/Params/Builtin/_tags'
- $ref: '#/definitions/Params/Builtin/_group'
- $ref: '#/definitions/Params/Custom/flow_id'
- $ref: '#/definitions/Params/Custom/run_number'
- $ref: '#/definitions/Params/Custom/step_name'
- $ref: '#/definitions/Params/Custom/task_id'
- $ref: '#/definitions/Params/Custom/name'
- $ref: '#/definitions/Params/Custom/type'
- $ref: '#/definitions/Params/Custom/ds_type'
- $ref: '#/definitions/Params/Custom/attempt_id'
- $ref: '#/definitions/Params/Custom/postprocess'
- $ref: '#/definitions/Params/Custom/invalidate'
- $ref: '#/definitions/Params/Custom/user_name'
- $ref: '#/definitions/Params/Custom/ts_epoch'
produces:
- application/json
responses:
"200":
description: Returns all artifacts of specified task
schema:
$ref: '#/definitions/ResponsesArtifactList'
"405":
description: invalid HTTP Method
schema:
$ref: '#/definitions/ResponsesError405'
"""
flow_name = request.match_info.get("flow_id")
run_number = request.match_info.get("run_number")
run_id_key, run_id_value = translate_run_key(run_number)
step_name = request.match_info.get("step_name")
task_id_key, task_id_value = translate_task_key(
request.match_info.get("task_id"))
return await find_records(request,
self._async_table,
initial_conditions=[
"flow_id = %s",
"{run_id_key} = %s".format(
run_id_key=run_id_key),
"step_name = %s",
"{task_id_key} = %s".format(
task_id_key=task_id_key)],
initial_values=[
flow_name, run_id_value, step_name, task_id_value],
allowed_order=self._async_table.keys,
allowed_group=self._async_table.keys,
allowed_filters=self._async_table.keys,
postprocess=postprocess_chain([
apply_run_tags_postprocess(flow_name, run_number, self._async_run_table),
self.get_postprocessor(request)])
)
@handle_exceptions
async def get_artifacts_by_step(self, request):
"""
---
description: Get all artifacts of specified step
tags:
- Artifact
parameters:
- $ref: '#/definitions/Params/Path/flow_id'
- $ref: '#/definitions/Params/Path/run_number'
- $ref: '#/definitions/Params/Path/step_name'
- $ref: '#/definitions/Params/Builtin/_page'
- $ref: '#/definitions/Params/Builtin/_limit'
- $ref: '#/definitions/Params/Builtin/_order'
- $ref: '#/definitions/Params/Builtin/_tags'
- $ref: '#/definitions/Params/Builtin/_group'
- $ref: '#/definitions/Params/Custom/flow_id'
- $ref: '#/definitions/Params/Custom/run_number'
- $ref: '#/definitions/Params/Custom/step_name'
- $ref: '#/definitions/Params/Custom/task_id'
- $ref: '#/definitions/Params/Custom/name'
- $ref: '#/definitions/Params/Custom/type'
- $ref: '#/definitions/Params/Custom/ds_type'
- $ref: '#/definitions/Params/Custom/attempt_id'
- $ref: '#/definitions/Params/Custom/postprocess'
- $ref: '#/definitions/Params/Custom/invalidate'
- $ref: '#/definitions/Params/Custom/user_name'
- $ref: '#/definitions/Params/Custom/ts_epoch'
produces:
- application/json
responses:
"200":
description: Returns all artifacts of specified step
schema:
$ref: '#/definitions/ResponsesArtifactList'
"405":
description: invalid HTTP Method
schema:
$ref: '#/definitions/ResponsesError405'
"""
flow_name = request.match_info.get("flow_id")
run_number = request.match_info.get("run_number")
run_id_key, run_id_value = translate_run_key(run_number)
step_name = request.match_info.get("step_name")
return await find_records(request,
self._async_table,
initial_conditions=[
"flow_id = %s",
"{run_id_key} = %s".format(
run_id_key=run_id_key),
"step_name = %s"],
initial_values=[
flow_name, run_id_value, step_name],
allowed_order=self._async_table.keys,
allowed_group=self._async_table.keys,
allowed_filters=self._async_table.keys,
postprocess=postprocess_chain([
apply_run_tags_postprocess(flow_name, run_number, self._async_run_table),
self.get_postprocessor(request)])
)
@handle_exceptions
async def get_artifacts_by_run(self, request):
"""
---
description: Get all artifacts of specified run
tags:
- Artifact
parameters:
- $ref: '#/definitions/Params/Path/flow_id'
- $ref: '#/definitions/Params/Path/run_number'
- $ref: '#/definitions/Params/Builtin/_page'
- $ref: '#/definitions/Params/Builtin/_limit'
- $ref: '#/definitions/Params/Builtin/_order'
- $ref: '#/definitions/Params/Builtin/_tags'
- $ref: '#/definitions/Params/Builtin/_group'
- $ref: '#/definitions/Params/Custom/flow_id'
- $ref: '#/definitions/Params/Custom/run_number'
- $ref: '#/definitions/Params/Custom/step_name'
- $ref: '#/definitions/Params/Custom/task_id'
- $ref: '#/definitions/Params/Custom/name'
- $ref: '#/definitions/Params/Custom/type'
- $ref: '#/definitions/Params/Custom/ds_type'
- $ref: '#/definitions/Params/Custom/attempt_id'
- $ref: '#/definitions/Params/Custom/postprocess'
- $ref: '#/definitions/Params/Custom/invalidate'
- $ref: '#/definitions/Params/Custom/user_name'
- $ref: '#/definitions/Params/Custom/ts_epoch'
produces:
- application/json
responses:
"200":
description: Returns all artifacts of specified run
schema:
$ref: '#/definitions/ResponsesArtifactList'
"405":
description: invalid HTTP Method
schema:
$ref: '#/definitions/ResponsesError405'
"""
flow_name = request.match_info.get("flow_id")
run_number = request.match_info.get("run_number")
run_id_key, run_id_value = translate_run_key(run_number)
return await find_records(request,
self._async_table,
initial_conditions=[
"flow_id = %s",
"{run_id_key} = %s".format(
run_id_key=run_id_key)],
initial_values=[
flow_name, run_id_value],
allowed_order=self._async_table.keys,
allowed_group=self._async_table.keys,
allowed_filters=self._async_table.keys,
postprocess=postprocess_chain([
apply_run_tags_postprocess(flow_name, run_number, self._async_run_table),
self.get_postprocessor(request)])
)
def get_postprocessor(self, request):
"pass query param &postprocess=true to enable postprocessing of S3 content. Otherwise returns None as postprocessor"
if request.query.get("postprocess", False) in ["true", "True", "1"]:
return self.refiner.postprocess
else:
return None