services/ui_backend_service/api/autocomplete.py (100 lines of code) (raw):

from services.utils import handle_exceptions, logging from services.data.db_utils import DBResponse, DBPagination, translate_run_key from .utils import format_response_list, web_response, custom_conditions_query, pagination_query, operators_to_filters import sys import asyncio TAGS_FILL_INTERVAL_SECONDS = 60 * 5 class AutoCompleteApi(object): def __init__(self, app, db): self.db = db # Cached resources # Cache tags so we don't have to request DB everytime self.tags = [] self.logger = logging.getLogger("AutoCompleteApi") app.router.add_route("GET", "/tags/autocomplete", self.get_tags) # Non-cached resources app.router.add_route("GET", "/flows/autocomplete", self.get_flows) app.router.add_route("GET", "/flows/{flow_id}/runs/autocomplete", self.get_runs_for_flow) app.router.add_route("GET", "/flows/{flow_id}/runs/{run_id}/steps/autocomplete", self.get_steps_for_run) app.router.add_route("GET", "/flows/{flow_id}/runs/{run_id}/artifacts/autocomplete", self.get_artifacts_for_run) loop = asyncio.get_event_loop() loop.create_task(self.periodic_tags_fetch_and_cache()) async def periodic_tags_fetch_and_cache(self): ''' Async task that fill tags cache every 5minutes. Database query might take a while so its better to cache the result. ''' while True: await self.update_cached_tags() # Check tags again after some sleep await asyncio.sleep(TAGS_FILL_INTERVAL_SECONDS) async def update_cached_tags(self): # Get all tags that are mentioned in runs table res, _ = await self.db.run_table_postgres.get_tags() if res.response_code == 200: self.tags = sorted(res.body) size = sys.getsizeof(self.tags) // 1024 // 1024 count = len(self.tags) self.logger.info("{} cached tags in memory consuming {} Mb".format(count, size)) @handle_exceptions async def get_tags(self, request): """ --- description: Get all available tags, with pagination. tags: - Autocomplete produces: - application/json responses: "200": description: Returns string list of all tags schema: $ref: '#/definitions/ResponsesAutocompleteTagList' """ # pagination setup page, limit, offset, _, _, _ = pagination_query(request) filter_func = None for key, val in request.query.items(): deconstruct = key.split(":", 1) if len(deconstruct) > 1: field = deconstruct[0] operator = deconstruct[1] else: field = key operator = None if field == 'tag' and operator in operators_to_filters: filter_func = operators_to_filters[operator] if filter_func: tags = [tag for tag in self.tags if filter_func(tag, val)][offset:(offset + limit)] else: tags = self.tags[offset:(offset + limit)] count = len(tags) pagination = DBPagination(limit, offset, count, page) status, body = format_response_list(request, DBResponse(200, tags), pagination, page) return web_response(status, body) @handle_exceptions async def get_flows(self, request): """ --- description: Get all flow id's as a list, with pagination. tags: - Autocomplete - Flow produces: - application/json responses: "200": description: Returns string list of all flow id's schema: $ref: '#/definitions/ResponsesAutocompleteFlowList' """ return await resource_response(request, self.db.flow_table_postgres.get_flow_ids, allowed_keys=["flow_id"]) @handle_exceptions async def get_runs_for_flow(self, request): """ --- description: Get all run id's for single flow tags: - Autocomplete - Run produces: - application/json responses: "200": description: Returns string list of run ids schema: $ref: '#/definitions/ResponsesAutocompleteRunList' """ flow_id = request.match_info.get("flow_id") return await resource_response( request, self.db.run_table_postgres.get_run_keys, initial_conditions=["flow_id=%s"], initial_values=[flow_id], allowed_keys=["run"] ) @handle_exceptions async def get_steps_for_run(self, request): """ --- description: Get all step names for single run tags: - Autocomplete - Step produces: - application/json responses: "200": description: Returns string list of step names schema: $ref: '#/definitions/ResponsesAutocompleteStepList' """ flow_id = request.match_info.get("flow_id") run_id = request.match_info.get("run_id") run_key, run_value = translate_run_key(run_id) return await resource_response( request, self.db.step_table_postgres.get_step_names, initial_conditions=["flow_id=%s", "{}=%s".format(run_key)], initial_values=[flow_id, run_value], allowed_keys=["step_name"] ) @handle_exceptions async def get_artifacts_for_run(self, request): """ --- description: Get all artifact names for single run tags: - Autocomplete - Artifact produces: - application/json responses: "200": description: Returns string list of step names schema: $ref: '#/definitions/ResponsesAutocompleteArtifactList' """ flow_id = request.match_info.get("flow_id") run_id = request.match_info.get("run_id") run_key, run_value = translate_run_key(run_id) return await resource_response( request, self.db.artifact_table_postgres.get_artifact_names, initial_conditions=["flow_id=%s", "{}=%s".format(run_key)], initial_values=[flow_id, run_value], allowed_keys=["name"] ) async def resource_response(request, get_record_fun, initial_conditions=[], initial_values=[], allowed_keys=[]): """ Abstract resource fetch helper that processes query and pagination parameters from the request, and performs a db query with the generated conditions, with a provided db getter. Parameters ---------- request : WebRequest aiohttp web request with .query key available. get_record_fun : Callable DB getter that should accept the following variables: (conditions, values, limit, offset) initial_conditions : List (optional) optional list of initial conditions to pass the db getter, along with values extracted from request parameters. initial_values : List (optional) optional list of initial values, for the initial conditions. allowed_keys : List (optional) optional list of allowed keys. Used to determine which keys are extracted from request parameters and which should be omitted. Returns ------- WebResponse A formatted web response with the default API response body. """ # pagination setup page, limit, offset, _, _, _ = pagination_query(request) # custom query conditions custom_conditions, custom_vals = custom_conditions_query(request, allowed_keys=allowed_keys) conditions = initial_conditions + custom_conditions values = initial_values + custom_vals db_response, pagination = await get_record_fun(conditions=conditions, values=values, limit=limit, offset=offset) status, body = format_response_list(request, db_response, pagination, page) return web_response(status, body)