services/metadata_service/api/metadata.py (82 lines of code) (raw):

from aiohttp import web import json from services.utils import read_body from services.metadata_service.api.utils import format_response, \ handle_exceptions import asyncio from services.data.postgres_async_db import AsyncPostgresDB class MetadataApi(object): _metadata_table = None lock = asyncio.Lock() def __init__(self, app): app.router.add_route( "GET", "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/" "tasks/{task_id}/metadata", self.get_metadata, ) app.router.add_route( "GET", "/flows/{flow_id}/runs/{run_number}/metadata", self.get_metadata_by_run, ) app.router.add_route( "POST", "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/" "tasks/{task_id}/metadata", self.create_metadata, ) self._db = AsyncPostgresDB.get_instance() self._async_table = AsyncPostgresDB.get_instance().metadata_table_postgres @format_response @handle_exceptions async def get_metadata(self, request): """ --- description: get all metadata associated with the specified task. tags: - Metadata parameters: - name: "flow_id" in: "path" description: "flow_id" required: true type: "string" - name: "run_number" in: "path" description: "run_number" required: true type: "string" - name: "step_name" in: "path" description: "step_name" required: true type: "string" - name: "task_id" in: "path" description: "task_id" required: true type: "string" produces: - text/plain responses: "200": description: successful operation "405": description: invalid HTTP Method """ flow_name = request.match_info.get("flow_id") run_number = request.match_info.get("run_number") step_name = request.match_info.get("step_name") task_id = request.match_info.get("task_id") return await self._async_table.get_metadata( flow_name, run_number, step_name, task_id ) @format_response @handle_exceptions async def get_metadata_by_run(self, request): """ --- description: get all metadata associated with the specified run. tags: - Metadata parameters: - name: "flow_id" in: "path" description: "flow_id" required: true type: "string" - name: "run_number" in: "path" description: "run_number" required: true type: "string" produces: - text/plain responses: "200": description: successful operation "405": description: invalid HTTP Method """ flow_name = request.match_info.get("flow_id") run_number = request.match_info.get("run_number") return await self._async_table.get_metadata_in_runs( flow_name, run_number ) async def create_metadata(self, request): """ --- description: persist metadata tags: - Metadata parameters: - name: "flow_id" in: "path" description: "flow_id" required: true type: "string" - name: "run_number" in: "path" description: "run_number" required: true type: "string" - name: "step_name" in: "path" description: "step_name" required: true type: "string" - name: "task_id" in: "path" description: "task_id" required: true type: "string" - name: "body" in: "body" description: "body" required: true schema: type: array items: type: object properties: field_name: type: string value: type: string type: type: string user_name: type: string tags: type: object system_tags: type: object produces: - 'text/plain' responses: "202": description: successful operation. "405": description: invalid HTTP Method """ flow_name = request.match_info.get("flow_id") run_number = request.match_info.get("run_number") step_name = request.match_info.get("step_name") task_id = request.match_info.get("task_id") body = await read_body(request.content) count = 0 try: run_number, run_id = await self._db.get_run_ids(flow_name, run_number) task_id, task_name = await self._db.get_task_ids(flow_name, run_number, step_name, task_id) except Exception: return web.Response(status=400, body=json.dumps( {"message": "need to register run_id and task_id first"})) for datum in body: values = { "flow_id": flow_name, "run_number": run_number, "run_id": run_id, "step_name": step_name, "task_id": task_id, "task_name": task_name, "field_name": datum.get("field_name", " "), "value": datum.get("value", " "), "type": datum.get("type", " "), "user_name": datum.get("user_name"), "tags": datum.get("tags"), "system_tags": datum.get("system_tags"), } metadata_response = await self._async_table.add_metadata(**values) if metadata_response.response_code == 200: count = count + 1 result = {"metadata_created": count} return web.Response(body=json.dumps(result))