services/metadata_service/api/step.py (58 lines of code) (raw):

from services.data import StepRow from services.data.tagging_utils import apply_run_tags_to_db_response from services.utils import read_body from services.metadata_service.api.utils import format_response, \ handle_exceptions from services.data.postgres_async_db import AsyncPostgresDB class StepApi(object): _step_table = None def __init__(self, app): app.router.add_route( "GET", "/flows/{flow_id}/runs/{run_number}/steps", self.get_steps ) app.router.add_route( "GET", "/flows/{flow_id}/runs/{run_number}/steps/{step_name}", self.get_step ) app.router.add_route( "POST", "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/step", self.create_step, ) self._async_table = AsyncPostgresDB.get_instance().step_table_postgres self._async_run_table = AsyncPostgresDB.get_instance().run_table_postgres self._db = AsyncPostgresDB.get_instance() @format_response @handle_exceptions async def get_steps(self, request): """ --- description: get all steps associated with the specified run. tags: - Steps parameters: - name: "flow_id" in: "path" description: "flow_id" required: true type: "string" - name: "run_number" in: "path" description: "run_number" required: true type: "string" produces: - text/plain responses: "200": description: successful operation. returned all steps "405": description: invalid HTTP Method """ flow_id = request.match_info.get("flow_id") run_number = request.match_info.get("run_number") db_response = await self._async_table.get_steps(flow_id, run_number) db_response = await apply_run_tags_to_db_response(flow_id, run_number, self._async_run_table, db_response) return db_response @format_response @handle_exceptions async def get_step(self, request): """ --- description: get specified step. "tags" and "system_tags" values will be persisted to DB, but will not be returned by read endpoints - the related run's "tags" and "system_tags" will be returned instead. tags: - Steps parameters: - name: "flow_id" in: "path" description: "flow_id" required: true type: "string" - name: "run_number" in: "path" description: "run_number" required: true type: "string" - name: "step_name" in: "path" description: "step_name" required: true type: "string" produces: - text/plain responses: "200": description: successful operation. Returned specified step "404": description: step not found "405": description: invalid HTTP Method """ flow_id = request.match_info.get("flow_id") run_number = request.match_info.get("run_number") step_name = request.match_info.get("step_name") db_response = await self._async_table.get_step(flow_id, run_number, step_name) db_response = await apply_run_tags_to_db_response(flow_id, run_number, self._async_run_table, db_response) return db_response @format_response @handle_exceptions async def create_step(self, request): """ --- description: Create step. tags: - Steps parameters: - name: "flow_id" in: "path" description: "flow_id" required: true type: "string" - name: "run_number" in: "path" description: "run_number" required: true type: "string" - name: "step_name" in: "path" description: "step_name" required: true type: "string" - name: "body" in: "body" description: "body" required: true schema: type: object properties: user_name: type: string tags: type: object system_tags: type: object produces: - text/plain responses: "200": description: successful operation. Registered step "405": description: invalid HTTP Method """ flow_id = request.match_info.get("flow_id") run_number = request.match_info.get("run_number") step_name = request.match_info.get("step_name") body = await read_body(request.content) user = body.get("user_name", "") tags = body.get("tags") system_tags = body.get("system_tags") run_number, run_id = await self._db.get_run_ids(flow_id, run_number) step_row = StepRow( flow_id, run_number, run_id, user, step_name, tags=tags, system_tags=system_tags ) db_response = await self._async_table.add_step(step_row) db_response = await apply_run_tags_to_db_response(flow_id, run_number, self._async_run_table, db_response) return db_response