services/metadata_service/api/artifact.py (208 lines of code) (raw):

from aiohttp import web from services.data.postgres_async_db import AsyncPostgresDB from services.data.db_utils import ( filter_artifacts_for_latest_attempt, filter_artifacts_by_attempt_id_for_tasks, ) from services.data.tagging_utils import apply_run_tags_to_db_response from services.utils import read_body from services.metadata_service.api.utils import ( format_response, handle_exceptions, http_500, ) import json class ArtificatsApi(object): _artifact_table = None def __init__(self, app): app.router.add_route( "GET", "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/" "tasks/{task_id}/artifacts/{artifact_name}", self.get_artifact, ) app.router.add_route( "GET", "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/" "tasks/{task_id}/artifacts/{artifact_name}/attempt/{attempt_id}", self.get_artifact_with_attempt, ) app.router.add_route( "GET", "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/" "tasks/{task_id}/artifacts", self.get_artifacts_by_task, ) app.router.add_route( "GET", "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/" "tasks/{task_id}/attempt/{attempt_id}/artifacts", self.get_artifacts_by_task_attempt, ) app.router.add_route( "GET", "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/artifacts", self.get_artifacts_by_step, ) app.router.add_route( "GET", "/flows/{flow_id}/runs/{run_number}/artifacts", self.get_artifacts_by_run, ) app.router.add_route( "POST", "/flows/{flow_id}/runs/{run_number}/steps/" "{step_name}/tasks/{task_id}/artifact", self.create_artifacts, ) self._async_table = AsyncPostgresDB.get_instance().artifact_table_postgres self._async_run_table = AsyncPostgresDB.get_instance().run_table_postgres self._db = AsyncPostgresDB.get_instance() @format_response @handle_exceptions async def get_artifact(self, request): """ --- description: get all artifacts associated with the specified task. tags: - Artifacts parameters: - name: "flow_id" in: "path" description: "flow_id" required: true type: "string" - name: "run_number" in: "path" description: "run_number" required: true type: "string" - name: "step_name" in: "path" description: "step_name" required: true type: "string" - name: "task_id" in: "path" description: "task_id" required: true type: "string" - name: "artifact_name" in: "path" description: "artifact_name" required: true type: "string" produces: - text/plain responses: "200": description: successful operation "405": description: invalid HTTP Method """ flow_id = request.match_info.get("flow_id") run_number = request.match_info.get("run_number") step_name = request.match_info.get("step_name") task_id = request.match_info.get("task_id") artifact_name = request.match_info.get("artifact_name") db_response = await self._async_table.get_artifact( flow_id, run_number, step_name, task_id, artifact_name ) db_response = await apply_run_tags_to_db_response(flow_id, run_number, self._async_run_table, db_response) return db_response @format_response @handle_exceptions async def get_artifact_with_attempt(self, request): """ --- description: get all artifacts associated with the specified task. tags: - Artifacts parameters: - name: "flow_id" in: "path" description: "flow_id" required: true type: "string" - name: "run_number" in: "path" description: "run_number" required: true type: "string" - name: "step_name" in: "path" description: "step_name" required: true type: "string" - name: "task_id" in: "path" description: "task_id" required: true type: "string" - name: "artifact_name" in: "path" description: "artifact_name" required: true type: "string" - name: "attempt_id" in: "path" description: "attempt_id" required: true type: "integer" produces: - text/plain responses: "200": description: successful operation "405": description: invalid HTTP Method """ flow_id = request.match_info.get("flow_id") run_number = request.match_info.get("run_number") step_name = request.match_info.get("step_name") task_id = request.match_info.get("task_id") artifact_name = request.match_info.get("artifact_name") attempt_id = request.match_info.get("attempt_id") db_response = await self._async_table.get_artifact_by_attempt( flow_id, run_number, step_name, task_id, artifact_name, attempt_id ) db_response = await apply_run_tags_to_db_response(flow_id, run_number, self._async_run_table, db_response) return db_response async def get_artifacts_by_task(self, request): """ --- description: get all artifacts associated with the specified task. tags: - Artifacts parameters: - name: "flow_id" in: "path" description: "flow_id" required: true type: "string" - name: "run_number" in: "path" description: "run_number" required: true type: "string" - name: "step_name" in: "path" description: "step_name" required: true type: "string" - name: "task_id" in: "path" description: "task_id" required: true type: "string" produces: - text/plain responses: "200": description: successful operation "405": description: invalid HTTP Method """ flow_id = request.match_info.get("flow_id") run_number = request.match_info.get("run_number") step_name = request.match_info.get("step_name") task_id = request.match_info.get("task_id") db_response = await self._async_table.get_artifact_in_task( flow_id, run_number, step_name, task_id ) if db_response.response_code == 200: db_response = await apply_run_tags_to_db_response(flow_id, run_number, self._async_run_table, db_response) filtered_body = filter_artifacts_for_latest_attempt(db_response.body) return web.Response( status=db_response.response_code, body=json.dumps(filtered_body) ) else: return web.Response( status=db_response.response_code, body=json.dumps(http_500(db_response.body)), ) async def get_artifacts_by_task_attempt(self, request): """ --- description: get all artifacts associated with the specified task. tags: - Artifacts parameters: - name: "flow_id" in: "path" description: "flow_id" required: true type: "string" - name: "run_number" in: "path" description: "run_number" required: true type: "string" - name: "step_name" in: "path" description: "step_name" required: true type: "string" - name: "task_id" in: "path" description: "task_id" required: true type: "string" - name: "attempt_id" in: "path" description: "attempt_id" required: true type: "integer" produces: - text/plain responses: "200": description: successful operation "405": description: invalid HTTP Method """ flow_id = request.match_info.get("flow_id") run_number = request.match_info.get("run_number") step_name = request.match_info.get("step_name") task_id = request.match_info.get("task_id") attempt_id = request.match_info.get("attempt_id") db_response = await self._async_table.get_artifact_in_task( flow_id, run_number, step_name, task_id ) if db_response.response_code == 200: db_response = await apply_run_tags_to_db_response(flow_id, run_number, self._async_run_table, db_response) if db_response.body: attempt_for_task = {db_response.body[0]["task_id"]: int(attempt_id)} else: # Doesn't matter attempt_for_task = {} filtered_body = filter_artifacts_by_attempt_id_for_tasks( db_response.body, attempt_for_task ) return web.Response( status=db_response.response_code, body=json.dumps(filtered_body) ) else: return web.Response( status=db_response.response_code, body=json.dumps(http_500(db_response.body)), ) async def get_artifacts_by_step(self, request): """ --- description: get all artifacts associated with the specified task. tags: - Artifacts parameters: - name: "flow_id" in: "path" description: "flow_id" required: true type: "string" - name: "run_number" in: "path" description: "run_number" required: true type: "string" - name: "step_name" in: "path" description: "step_name" required: true type: "string" produces: - text/plain responses: "200": description: successful operation "405": description: invalid HTTP Method """ flow_id = request.match_info.get("flow_id") run_number = request.match_info.get("run_number") step_name = request.match_info.get("step_name") db_response = await self._async_table.get_artifact_in_steps( flow_id, run_number, step_name ) if db_response.response_code == 200: db_response = await apply_run_tags_to_db_response(flow_id, run_number, self._async_run_table, db_response) filtered_body = filter_artifacts_for_latest_attempt(db_response.body) return web.Response( status=db_response.response_code, body=json.dumps(filtered_body) ) else: return web.Response( status=db_response.response_code, body=json.dumps(http_500(db_response.body)), ) async def get_artifacts_by_run(self, request): """ --- description: get all artifacts associated with the specified task. tags: - Artifacts parameters: - name: "flow_id" in: "path" description: "flow_id" required: true type: "string" - name: "run_number" in: "path" description: "run_number" required: true type: "string" produces: - text/plain responses: "200": description: successful operation "405": description: invalid HTTP Method """ flow_id = request.match_info.get("flow_id") run_number = request.match_info.get("run_number") db_response = await self._async_table.get_artifacts_in_runs(flow_id, run_number) if db_response.response_code == 200: db_response = await apply_run_tags_to_db_response(flow_id, run_number, self._async_run_table, db_response) filtered_body = filter_artifacts_for_latest_attempt(db_response.body) return web.Response( status=db_response.response_code, body=json.dumps(filtered_body) ) else: return web.Response( status=db_response.response_code, body=json.dumps(http_500(db_response.body)), ) async def create_artifacts(self, request): """ --- description: This end-point allow to test that service is up. "tags" and "system_tags" values will be persisted to DB, but will not be returned by read endpoints - the related run's "tags" and "system_tags" will be returned instead. tags: - Artifacts parameters: - name: "flow_id" in: "path" description: "flow_id" required: true type: "string" - name: "run_number" in: "path" description: "run_number" required: true type: "string" - name: "step_name" in: "path" description: "step_name" required: true type: "string" - name: "task_id" in: "path" description: "task_id" required: true type: "string" - name: "body" in: "body" description: "body" required: true schema: type: array items: type: object properties: name: type: string location: type: string ds_type: type: string content_type: type: string attempt_id: type: integer user_name: type: string tags: type: object system_tags: type: object produces: - 'text/plain' responses: "202": description: successful operation. "405": description: invalid HTTP Method """ # { # "name": "test", # "location": "test", # "ds_type": "content", # "sha": "test", # "type": "content", # "content_type": "content", # "attempt_id": 0, # "user_name": "fhamad", # "tags": { # "user": "fhamad" # }, # "system_tags": { # "user": "fhamad" # } # } flow_name = request.match_info.get("flow_id") run_number = request.match_info.get("run_number") step_name = request.match_info.get("step_name") task_id = request.match_info.get("task_id") body = await read_body(request.content) count = 0 try: run_number, run_id = await self._db.get_run_ids(flow_name, run_number) task_id, task_name = await self._db.get_task_ids( flow_name, run_number, step_name, task_id ) except Exception: return web.Response( status=400, body=json.dumps( {"message": "need to register run_id and task_id first"} ), ) # todo change to bulk insert for artifact in body: values = { "flow_id": flow_name, "run_number": run_number, "run_id": run_id, "step_name": step_name, "task_id": task_id, "task_name": task_name, "name": artifact.get("name", " "), "location": artifact.get("location", " "), "ds_type": artifact.get("ds_type", " "), "sha": artifact.get("sha", " "), "type": artifact.get("type", " "), "content_type": artifact.get("content_type", " "), "attempt_id": artifact.get("attempt_id", 0), "user_name": artifact.get("user_name", " "), "tags": artifact.get("tags"), "system_tags": artifact.get("system_tags"), } artifact_response = await self._async_table.add_artifact(**values) if artifact_response.response_code == 200: count = count + 1 result = {"artifacts_created": count} return web.Response(body=json.dumps(result))