azext_edge/edge/providers/check/opcua.py (77 lines of code) (raw):

# coding=utf-8 # ---------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License file in the project root for license information. # ---------------------------------------------------------------------------------------------- from rich.padding import Padding from typing import Any, Dict, List from ..base import get_namespaced_pods_by_prefix from .base import ( CheckManager, check_post_deployment, filter_resources_by_name, evaluate_pod_health, get_resources_grouped_by_namespace, ) from .common import ( PADDING_SIZE, CoreServiceResourceKinds, ResourceOutputDetailLevel, ) from ..support.connectors import OPC_NAME_VAR_LABEL, OPCUA_NAME_LABEL def check_opcua_deployment( as_list: bool = False, detail_level: int = ResourceOutputDetailLevel.summary.value, resource_kinds: List[str] = None, resource_name: str = None, ) -> List[dict]: evaluate_funcs = { CoreServiceResourceKinds.RUNTIME_RESOURCE: evaluate_core_service_runtime, } return check_post_deployment( evaluate_funcs=evaluate_funcs, as_list=as_list, detail_level=detail_level, resource_kinds=resource_kinds, resource_name=resource_name, ) def evaluate_core_service_runtime( as_list: bool = False, detail_level: int = ResourceOutputDetailLevel.summary.value, resource_name: str = None, ) -> Dict[str, Any]: check_manager = CheckManager(check_name="evalCoreServiceRuntime", check_desc="Evaluate OPC UA broker core service") padding = 6 opcua_runtime_resources: List[dict] = [] for label_selector in [OPC_NAME_VAR_LABEL, OPCUA_NAME_LABEL]: opcua_runtime_resources.extend( get_namespaced_pods_by_prefix( prefix="", namespace="", label_selector=label_selector, ) ) if resource_name: opcua_runtime_resources = filter_resources_by_name( resources=opcua_runtime_resources, resource_name=resource_name, ) if not opcua_runtime_resources: check_manager.add_target(target_name=CoreServiceResourceKinds.RUNTIME_RESOURCE.value) check_manager.add_display( target_name=CoreServiceResourceKinds.RUNTIME_RESOURCE.value, display=Padding("Unable to fetch pods.", (0, 0, 0, padding + 2)), ) for namespace, pods in get_resources_grouped_by_namespace(opcua_runtime_resources): check_manager.add_target(target_name=CoreServiceResourceKinds.RUNTIME_RESOURCE.value, namespace=namespace) check_manager.add_display( target_name=CoreServiceResourceKinds.RUNTIME_RESOURCE.value, namespace=namespace, display=Padding( f"OPC UA broker runtime resources in namespace {{[purple]{namespace}[/purple]}}", (0, 0, 0, padding) ), ) evaluate_pod_health( check_manager=check_manager, target=CoreServiceResourceKinds.RUNTIME_RESOURCE.value, namespace=namespace, padding=padding + PADDING_SIZE, detail_level=detail_level, pods=pods, ) return check_manager.as_dict(as_list)