services/ui_backend_service/api/admin.py (155 lines of code) (raw):

import os import hashlib import asyncio from aiohttp import web from multidict import MultiDict from services.utils import (METADATA_SERVICE_HEADER, METADATA_SERVICE_VERSION, SERVICE_BUILD_TIMESTAMP, SERVICE_COMMIT_HASH, web_response) from .utils import get_json_config UI_SERVICE_VERSION = "{metadata_v}-{timestamp}-{commit}".format( metadata_v=METADATA_SERVICE_VERSION, timestamp=SERVICE_BUILD_TIMESTAMP or "", commit=SERVICE_COMMIT_HASH or "" ) class AdminApi(object): """ Provides administrative routes for the UI Service, such as health checks, version info and custom navigation links. """ def __init__(self, app, cache_store): self.cache_store = cache_store app.router.add_route("GET", "/ping", self.ping) app.router.add_route("GET", "/version", self.version) app.router.add_route("GET", "/links", self.links) app.router.add_route("GET", "/notifications", self.get_notifications) app.router.add_route("GET", "/status", self.status) defaults = [ {"href": 'https://docs.metaflow.org/', "label": 'Documentation'}, {"href": 'http://chat.metaflow.org/', "label": 'Help'} ] self.notifications = _get_notifications_config() or [] self.navigation_links = _get_links_config() or defaults async def version(self, request): """ --- description: Returns the version of the metadata service tags: - Admin produces: - 'text/plain' responses: "200": description: successful operation. Return the version number "405": description: invalid HTTP Method """ return web.Response(text=str(UI_SERVICE_VERSION)) async def ping(self, request): """ --- description: This end-point allow to test that service is up. tags: - Admin produces: - 'text/plain' responses: "202": description: successful operation. Return "pong" text "405": description: invalid HTTP Method """ return web.Response(text="pong", headers=MultiDict( {METADATA_SERVICE_HEADER: METADATA_SERVICE_VERSION})) async def links(self, request): """ --- description: Provides custom navigation links for UI. tags: - Admin produces: - 'application/json' responses: "200": description: Returns the custom navigation links for UI schema: $ref: '#/definitions/ResponsesLinkList' "405": description: invalid HTTP Method """ return web_response(status=200, body=self.navigation_links) async def get_notifications(self, request): """ --- description: Provides System Notifications for the UI tags: - Admin produces: - 'application/json' responses: "200": description: Returns list of active system notification schema: $ref: '#/definitions/ResponsesNotificationList' "405": description: invalid HTTP Method """ processed_notifications = [] for notification in self.notifications: try: if "message" not in notification: continue # Created at is required and "start" is used by default if not value provided # Notification will be ignored if both "created" and "start" are missing created = notification.get("created", notification.get("start", None)) if not created: continue processed_notifications.append({ "id": notification.get("id", hashlib.sha1( str(notification).encode('utf-8')).hexdigest()), "type": notification.get("type", "info"), "contentType": notification.get("contentType", "text"), "message": notification.get("message", ""), "url": notification.get("url", None), "urlText": notification.get("urlText", None), "created": created, "start": notification.get("start", None), "end": notification.get("end", None) }) except: pass # Filter notifications based on query parameters # Supports eq,ne.lt,le,gt,ge operators for all the fields def filter_notifications(notification): comp_operators = { "eq": lambda a, b: a == b, "ne": lambda a, b: a != b, "lt": lambda a, b: a < b, "le": lambda a, b: a <= b, "gt": lambda a, b: a > b, "ge": lambda a, b: a >= b, } try: for q in request.query.keys(): if ":" in q: field, op = q.split(":", 1) else: field, op = q, "eq" # Make sure compare operator is supported, otherwise ignore # Compare value is typecasted to match field type if op in comp_operators: field_val = notification.get(field, None) if not field_val: continue comp_val = type(field_val)(request.query.get(q, None)) if not comp_val: continue if not comp_operators[op](field_val, comp_val): return False except: pass return True return web_response(status=200, body=list( filter(filter_notifications, processed_notifications))) async def status(self, request): """ --- description: Display system status information, such as cache tags: - Admin produces: - 'application/json' responses: "200": description: Return system status information, such as cache "405": description: invalid HTTP Method """ cache_status = {} for store in [self.cache_store.artifact_cache, self.cache_store.dag_cache, self.cache_store.log_cache]: try: # Use client ping to verify communcation, True = ok await store.cache.ping() ping = True except Exception as ex: ping = str(ex) try: # Use Check -action to verify Cache communication, True = ok await store.cache.request_and_return([store.cache.check()], None) check = True except Exception as ex: check = str(ex) # Extract list of worker subprocesses worker_list = [] cache_server_pid = store.cache._proc.pid if store.cache._proc else None if cache_server_pid: try: proc = await asyncio.create_subprocess_shell( "pgrep -P {}".format(cache_server_pid), stdout=asyncio.subprocess.PIPE) stdout, _ = await proc.communicate() if stdout: pids = stdout.decode().splitlines() proc = await asyncio.create_subprocess_shell( "ps -p {} -o pid,%cpu,%mem,stime,time,command".format(",".join(pids)), stdout=asyncio.subprocess.PIPE) stdout, _ = await proc.communicate() worker_list = stdout.decode().splitlines() except Exception as ex: worker_list = str(ex) else: worker_list = "Unable to get cache server pid" # Extract current cache data usage in bytes current_size = 0 try: cache_data_path = os.path.abspath(store.cache._root) proc = await asyncio.create_subprocess_shell( "du -s {} | cut -f1".format(cache_data_path), stdout=asyncio.subprocess.PIPE) stdout, _ = await proc.communicate() if stdout: current_size = int(stdout.decode()) except Exception as ex: current_size = str(ex) cache_status[store.__class__.__name__] = { "restart_requested": store.cache._restart_requested, "is_alive": store.cache._is_alive, "pending_requests": list(store.cache.pending_requests), "root": store.cache._root, "prev_is_alive": store.cache._prev_is_alive, "action_classes": list(map(lambda cls: cls.__name__, store.cache._action_classes)), "max_actions": store.cache._max_actions, "max_size": store.cache._max_size, "current_size": current_size, "ping": ping, "check_action": check, "proc": { "pid": store.cache._proc.pid, "returncode": store.cache._proc.returncode, } if store.cache._proc else None, "workers": worker_list } return web_response(status=200, body={ "cache": cache_status }) def _get_links_config(): return get_json_config("custom_quicklinks") def _get_notifications_config(): return get_json_config("notifications")