services/ui_backend_service/api/search.py (172 lines of code) (raw):

import asyncio from collections import defaultdict from typing import Tuple from services.data.db_utils import translate_run_key from services.data.tagging_utils import apply_run_tags_to_db_response from services.utils import handle_exceptions, logging from services.ui_backend_service.data.cache.utils import ( search_result_event_msg, error_event_msg ) from .utils import query_param_enabled from urllib.parse import unquote_plus from aiohttp import web import json SCOPE_ARTIFACT = 'ARTIFACT' SCOPE_FOREACH_VARIABLE = 'FOREACH_VARIABLE' class SearchApi(object): def __init__(self, app, db, cache=None): self.db = db # Old search route kept for backwards compatibility app.router.add_route( "GET", "/flows/{flow_id}/runs/{run_number}/search", self.get_run_tasks ) app.router.add_route( "GET", "/search/flows/{flow_id}/runs/{run_number}", self.get_run_tasks ) self._artifact_table = self.db.artifact_table_postgres self._metadata_table = self.db.metadata_table_postgres self._run_table = self.db.run_table_postgres self._artifact_store = getattr(cache, "artifact_cache", None) self.search_results = defaultdict(list) self.search_result_keyset = defaultdict(set) @handle_exceptions async def get_run_tasks(self, request): scope_list = _decode_url_param_value(request.query.get('scope', SCOPE_ARTIFACT)).split(',') ws = web.WebSocketResponse() await ws.prepare(request) tasks = [] try: if SCOPE_ARTIFACT in scope_list: tasks.append(self.search_artifacts(request, ws)) if SCOPE_FOREACH_VARIABLE in scope_list: tasks.append(self.search_foreach_variable(request, ws)) await asyncio.gather(*tasks) except Exception as ex: logging.exception("Filter tasks failed.") await ws.send_str(json.dumps({"event": error_event_msg("Filter tasks failed", "filter-tasks-failed")})) await ws.close(code=1011) # Clean up the search results so that results dict won't grow indefinitely. request_identifier = _construct_requset_identifier(request) if request_identifier in self.search_results: del self.search_results[request_identifier] if request_identifier in self.search_result_keyset: del self.search_result_keyset[request_identifier] return ws async def search_foreach_variable(self, request, ws): """ Search over the foreach variables for a given run. This searches over the metadata table, and updates results with all tasks that have the specified variable name in their foreach stack. """ flow_name = request.match_info['flow_id'] run_key = request.match_info['run_number'] key = request.query['key'] value = _decode_url_param_value(request.query.get('value', None)) metadata_key = key + "=" + value if value else key metadata_items = await self.get_run_metadata(flow_name, run_key, "foreach-stack") results = [] for item in metadata_items: if metadata_key in str(item['value']): results.append({**_metadata_result_format(item), "searchable": True}) request_identifier = _construct_requset_identifier(request) self.union_results(results, request_identifier) await ws.send_str(json.dumps({"event": search_result_event_msg(self.search_results[request_identifier])})) async def search_artifacts(self, request, ws): """ Search over the artifacts for a given run. This search is handled by subprocesses of CacheStore. This updates results with all tasks that have the specified artifact name. """ flow_name = request.match_info['flow_id'] run_key = request.match_info['run_number'] key = request.query['key'] value = request.query.get('value', None) invalidate_cache = query_param_enabled(request, "invalidate") meta_artifacts = await self.get_run_artifacts(flow_name, run_key, key) if value is None: # For empty search terms simply return the list of tasks that have artifacts by the specified name. # Do not unnecessarily hit the cache. results = [{**_artifact_result_format(art), "searchable": True} for art in meta_artifacts] else: operator, value = _parse_search_term(value) # Search through the artifact contents using the CacheClient # Prefer run_id over run_number and task_name over task_id pathspecs = ["{flow_id}/{run_id}/{step_name}/{task_name}/{name}/{attempt_id}".format( flow_id=art['flow_id'], run_id=art.get('run_id') or art['run_number'], step_name=art['step_name'], task_name=art.get('task_name') or art['task_id'], name=art['name'], attempt_id=art['attempt_id']) for art in meta_artifacts] res = await self._artifact_store.cache.SearchArtifacts( pathspecs, value, operator, invalidate_cache=invalidate_cache) if res.has_pending_request(): async for event in res.stream(): await ws.send_str(json.dumps(event)) await res.wait() artifact_data = res.get() results = await _search_dict_filter(meta_artifacts, artifact_data) request_identifier = _construct_requset_identifier(request) self.union_results(results, request_identifier) await ws.send_str(json.dumps({"event": search_result_event_msg(self.search_results[request_identifier])})) async def get_run_artifacts(self, flow_name, run_key, artifact_name): """ Find a set of artifacts to perform the search over. Includes localstore artifacts as well, as we want to return that these could not be searched over. """ run_id_key, run_id_value = translate_run_key(run_key) db_response = await self._artifact_table.get_records( filter_dict={ "flow_id": flow_name, run_id_key: run_id_value, "name": artifact_name } ) db_response = await apply_run_tags_to_db_response(flow_name, run_key, self._run_table, db_response) return db_response.body async def get_run_metadata(self, flow_name, run_key, metadata_name): """ Find a set of artifacts to perform the search over. Includes localstore artifacts as well, as we want to return that these could not be searched over. """ run_id_key, run_id_value = translate_run_key(run_key) db_response = await self._metadata_table.get_records( filter_dict={ "flow_id": flow_name, run_id_key: run_id_value, "field_name": metadata_name } ) return db_response.body def union_results(self, new_results: dict, request_identifier: str): """ Union new results with existing results, and update the keyset to prevent duplicate results. Key is determined by step_name and task_id. """ for result in new_results: key = result['step_name'] + '/' + result['task_id'] if key in self.search_result_keyset: continue self.search_result_keyset[request_identifier].add(key) self.search_results[request_identifier].append(result) # Utilities async def _search_dict_filter(artifacts, artifact_match_dict={}): """ Combines search match data dict with a list of artifacts to create actual search results. Parameters ---------- artifacts: List list of artifacts used to construct pathspecs artifact_match_dict: Dict dictionary of pathspec -based match data. example: {'FlowId/RunNumber/StepName/TaskId/ArtifactName': {'matches': boolean, 'included': boolean}} Matches: whether the search term matched the artifact content or not Included: Whether the artifact content was included in the search or not (was the content accessible at all) Returns ------- List example: [ { 'flow_id': str, 'run_number': int, 'step_name': str, 'task_id': int, 'searchable': boolean, 'error': null } ] searchable: denotes whether the task had an artifact that could be searched or not. False in cases where the artifact could not be included in the search error: either null or error object with following structure { 'id': str, 'detail': str } example: { 'id': 's3-access-denied', 'detail': 's3://...' } """ results = [] for artifact in artifacts: # Prefer run_id over run_number and task_name over task_id pathspec = "{flow_id}/{run_id}/{step_name}/{task_name}/{name}/{attempt_id}".format( flow_id=artifact['flow_id'], run_id=artifact.get('run_id') or artifact['run_number'], step_name=artifact['step_name'], task_name=artifact.get('task_name') or artifact['task_id'], name=artifact['name'], attempt_id=artifact['attempt_id']) if pathspec in artifact_match_dict: match_data = artifact_match_dict[pathspec] if match_data['matches'] or not match_data['included']: results.append({ **_artifact_result_format(artifact), "searchable": match_data['included'], "error": match_data['error'] }) return results def _artifact_result_format(art): return dict( [key, val] for key, val in art.items() if key in ['flow_id', 'run_number', 'step_name', 'task_id', '_foreach_stack'] ) def _metadata_result_format(meta): return dict( [key, val] for key, val in meta.items() if key in ['flow_id', 'run_number', 'step_name', 'task_id', 'value'] ) def _parse_search_term(term: str) -> Tuple[str, str]: """ Return search operator, and the parsed search term. """ # TODO: extend parsing to all predicates, not just eq&co partial_search = not (term.startswith("\"") and term.endswith("\"")) if partial_search: return "co", term else: return "eq", term[1:len(term) - 1] def _decode_url_param_value(value): """ Decode url param value that is encoded multiple times. """ if not value: return None DECODE_ROUND = 3 for _ in range(DECODE_ROUND): value = unquote_plus(value) return value def _construct_requset_identifier(request): """ Extracts the flow_name, run_key, key and value from the request to construct a unique key for the search result. """ flow_name = request.match_info['flow_id'] run_key = request.match_info['run_number'] key = request.query['key'] value = _decode_url_param_value(request.query.get('value', None)) return f"{flow_name}/{run_key} {key}:{value}"