services/metadata_service/api/flow.py (35 lines of code) (raw):

from services.data import FlowRow from services.data.postgres_async_db import AsyncPostgresDB from services.utils import read_body from services.metadata_service.api.utils import format_response, \ handle_exceptions import asyncio class FlowApi(object): _flow_table = None lock = asyncio.Lock() def __init__(self, app): app.router.add_route("GET", "/flows", self.get_all_flows) app.router.add_route("GET", "/flows/{flow_id}", self.get_flow) app.router.add_route("POST", "/flows/{flow_id}", self.create_flow) self._async_table = AsyncPostgresDB.get_instance().flow_table_postgres @format_response @handle_exceptions async def create_flow(self, request): """ --- description: create/register a flow tags: - Flow parameters: - name: "flow_id" in: "path" description: "flow_id" required: true type: "string" - name: "body" in: "body" description: "body" required: true schema: type: object properties: user_name: type: string tags: type: object system_tags: type: object produces: - 'text/plain' responses: "200": description: successfully created flow row "409": description: CONFLICT record exists """ flow_name = request.match_info.get("flow_id") body = await read_body(request.content) user = body.get("user_name") tags = body.get("tags") system_tags = body.get("system_tags") flow = FlowRow( flow_id=flow_name, user_name=user, tags=tags, system_tags=system_tags ) return await self._async_table.add_flow(flow) @format_response @handle_exceptions async def get_flow(self, request): """ --- description: Get flow by id tags: - Flow parameters: - name: "flow_id" in: "path" description: "flow_id" required: true type: "string" produces: - text/plain responses: "200": description: successful operation. Return flow "404": description: flow not found "405": description: invalid HTTP Method """ flow_name = request.match_info.get("flow_id") return await self._async_table.get_flow(flow_name) @format_response @handle_exceptions async def get_all_flows(self, request): """ --- description: Get all flows tags: - Flow produces: - text/plain responses: "200": description: successful operation. Returned all registered flows "405": description: invalid HTTP Method """ return await self._async_table.get_all_flows()