services/ui_backend_service/api/card.py (184 lines of code) (raw):

from typing import Dict, Optional from services.data.db_utils import translate_run_key, translate_task_key from services.ui_backend_service.data import unpack_processed_value from services.utils import handle_exceptions from .utils import ( format_response_list, get_pathspec_from_request, query_param_enabled, web_response, DBPagination, DBResponse, ) from services.ui_backend_service.data.cache.card_cache_manager import wait_until_card_is_ready, CARD_API_HTML_WAIT_TIME from services.ui_backend_service.data.cache.card_cache_manager import list_cards as list_cards_from_cache import time from aiohttp import web import asyncio class CardsApi(object): def __init__(self, app, db, cache=None): self.db = db self.cache = getattr(cache, "card_cache", None) app.router.add_route( "GET", "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}/cards", self.get_cards_list_for_task, ) app.router.add_route( "GET", "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}/cards/{hash}", self.get_card_content_by_hash, ) app.router.add_route( "GET", "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}/cards/{hash}/data", self.get_card_data_by_hash, ) async def get_task_by_request(self, request): flow_id, run_number, step_name, task_id, _ = get_pathspec_from_request(request) run_id_key, run_id_value = translate_run_key(run_number) task_id_key, task_id_value = translate_task_key(task_id) conditions = [ "flow_id = %s", "{run_id_key} = %s".format(run_id_key=run_id_key), "step_name = %s", "{task_id_key} = %s".format(task_id_key=task_id_key), ] values = [flow_id, run_id_value, step_name, task_id_value] db_response, *_ = await self.db.task_table_postgres.find_records( fetch_single=True, conditions=conditions, values=values, expanded=True, ) if db_response.response_code == 200: return db_response.body return None @handle_exceptions async def get_cards_list_for_task(self, request): """ --- description: Get all identifiers of cards for a task tags: - Card parameters: - $ref: '#/definitions/Params/Path/flow_id' - $ref: '#/definitions/Params/Path/run_number' - $ref: '#/definitions/Params/Path/step_name' - $ref: '#/definitions/Params/Path/task_id' - $ref: '#/definitions/Params/Builtin/_page' - $ref: '#/definitions/Params/Builtin/_limit' - $ref: '#/definitions/Params/Custom/invalidate' produces: - application/json responses: "200": description: Returns a list of cards for the specified task schema: $ref: '#/definitions/ResponsesCardList' "404": description: Task was not found. "405": description: invalid HTTP Method schema: $ref: '#/definitions/ResponsesError405' """ task = await self.get_task_by_request(request) if not task: return web_response(404, {"data": []}) cards = await get_card_list(self.cache, task, max_wait_time=1) if cards is None: # Handle edge: Cache failed to return anything, even errors. # NOTE: choice of status 200 here is quite arbitrary, as the cache returning None is usually # caused by a premature request, and cards are not permanently missing. return web_response(200, {"data": []}) card_hashes = [ {"id": data["id"], "hash": hash, "type": data["type"]} for hash, data in cards.items() ] # paginate list of cards limit, page, offset = get_pagination_params(request) _pages = max(len(card_hashes) // limit, 1) limited_set = card_hashes[offset:][:limit] response = DBResponse(200, limited_set) pagination = DBPagination(limit, limit * (page - 1), len(response.body), page) status, body = format_response_list(request, response, pagination, page, _pages) return web_response(status, body) @handle_exceptions async def get_card_content_by_hash(self, request): """ --- description: Get specific card of a task tags: - Card parameters: - $ref: '#/definitions/Params/Path/flow_id' - $ref: '#/definitions/Params/Path/run_number' - $ref: '#/definitions/Params/Path/step_name' - $ref: '#/definitions/Params/Path/task_id' - $ref: '#/definitions/Params/Custom/invalidate' produces: - text/html responses: "200": description: Returns the HTML content of a card with the specific hash "404": description: Card was not found. "405": description: invalid HTTP Method schema: $ref: '#/definitions/ResponsesError405' """ hash = request.match_info.get("hash") task = await self.get_task_by_request(request) if not task: return web.Response( content_type="text/html", status=404, body="Task not found." ) cards = await get_card_html_for_task_async( self.cache, task, hash, ) if cards is None: return web.Response( content_type="text/html", status=404, body="Card not found for task. Possibly still being processed. Please refresh page to check again.", ) if cards and hash in cards: return web.Response(content_type="text/html", body=cards[hash]["html"]) else: return web.Response( content_type="text/html", status=404, body="Card not found for task.", ) @handle_exceptions async def get_card_data_by_hash(self, request): """ --- description: Get the data of a card created for a task. Contains any additional updates needed by the card. tags: - Card parameters: - $ref: '#/definitions/Params/Path/flow_id' - $ref: '#/definitions/Params/Path/run_number' - $ref: '#/definitions/Params/Path/step_name' - $ref: '#/definitions/Params/Path/task_id' - $ref: '#/definitions/Params/Path/hash' produces: - application/json responses: "200": description: Returns the data object created by the realtime card with the specific hash "404": description: Card data was not found. "405": description: invalid HTTP Method schema: $ref: '#/definitions/ResponsesError405' """ _hash = request.match_info.get("hash") task = await self.get_task_by_request(request) if not task: return web.Response( content_type="text/html", status=404, body="Task not found." ) data = await get_card_data_for_task_async( self.cache, task, _hash, ) if data is None: return web_response(404, {"error": "Card data not found for task"}) else: return web_response(200, data) def _card_data_from_cache(local_cache): data = local_cache.read_data() if data is None: return None return { "data": data["data"], "id": local_cache.card_id, "type": local_cache.card_type, } async def get_card_html_for_task_async( cache_client, task, card_hash, ) -> Optional[Dict[str, Dict]]: """ Return the card-data from the cache, or nothing. Example: -------- { "id": 1, "hash": "abc123", "data": {} } """ pathspec = "{flow_id}/{run_id}/{step_name}/{task_id}".format( flow_id=task.get("flow_id"), run_id=task.get("run_id") or task.get("run_number"), step_name=task.get("step_name"), task_id=task.get("task_name") or task.get("task_id"), ) _local_cache = cache_client.cache_manager.get_local_cache(pathspec, card_hash) _html = await wait_until_card_is_ready( cache_client.cache_manager, _local_cache, max_wait_time=CARD_API_HTML_WAIT_TIME ) return _html async def get_card_data_for_task_async( cache_client, task, card_hash, ) -> Optional[Dict[str, Dict]]: """ Return the card-data from the cache, or nothing. Example: -------- { "id": 1, "hash": "abc123", "data": {} } """ pathspec = "{flow_id}/{run_id}/{step_name}/{task_id}".format( flow_id=task.get("flow_id"), run_id=task.get("run_id") or task.get("run_number"), step_name=task.get("step_name"), task_id=task.get("task_name") or task.get("task_id"), ) await cache_client.cache_manager.register(pathspec) _local_cache = cache_client.cache_manager.get_local_cache(pathspec, card_hash) if not _local_cache.read_ready(): # Since this is a data update call we can return a 404 and the client # should handle calling back so we only await at registration. return None return _card_data_from_cache(_local_cache) async def get_card_list( cache_client, task, max_wait_time=3 ): pathspec = "{flow_id}/{run_id}/{step_name}/{task_id}".format( flow_id=task.get("flow_id"), run_id=task.get("run_id") or task.get("run_number"), step_name=task.get("step_name"), task_id=task.get("task_name") or task.get("task_id"), ) return await list_cards_from_cache(cache_client.cache_manager, pathspec, max_wait_time) def get_pagination_params(request): """extract pagination params from request""" # Page page = max(int(request.query.get("_page", 1)), 1) # Limit # Default limit is 1000, maximum is 10_000 limit = min(int(request.query.get("_limit", 1000)), 10000) # Offset offset = limit * (page - 1) return limit, page, offset class CardException(Exception): def __init__( self, msg="Failed to read card contents", id="card-error", traceback_str=None ): self.message = msg self.id = id self.traceback_str = traceback_str def __str__(self): return self.message