azext_edge/edge/providers/check/dataflow.py (1,356 lines of code) (raw):

# coding=utf-8 # ---------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License file in the project root for license information. # ---------------------------------------------------------------------------------------------- from typing import List from knack.log import get_logger from rich.padding import Padding from ...common import DEFAULT_DATAFLOW_PROFILE, CheckTaskStatus, ResourceState from ..base import get_namespaced_pods_by_prefix from ..edge_api.dataflow import DATAFLOW_API_V1, DataflowResourceKinds from ..support.dataflow import DATAFLOW_NAME_LABEL, DATAFLOW_OPERATOR_PREFIX, DATAFLOW_PROFILE_POD_PREFIX from .base import ( CheckManager, check_post_deployment, get_resources_by_name, get_resources_grouped_by_namespace, ) from .base.display import basic_property_display, colorize_string from .base.pod import evaluate_pod_health from .base.resource import filter_resources_by_name from .common import ( DEFAULT_PADDING, DEFAULT_PROPERTY_DISPLAY_COLOR, PADDING_SIZE, CoreServiceResourceKinds, DataFlowEndpointAuthenticationType, DataflowEndpointType, DataflowOperationType, ResourceOutputDetailLevel, ) logger = get_logger(__name__) PADDING = DEFAULT_PADDING INNER_PADDING = PADDING + PADDING_SIZE dataflow_api_check_name = "enumerateDataflowApi" dataflow_api_check_desc = "Enumerate Dataflow API resources" dataflow_runtime_check_name = "evalCoreServiceRuntime" dataflow_runtime_check_desc = "Evaluate Dataflow core service" dataflows_check_name = "evalDataflows" dataflows_check_desc = "Evaluate Dataflows" dataflow_endpoint_check_name = "evalDataflowEndpoints" dataflow_endpoint_check_desc = "Evaluate Dataflow Endpoints" dataflow_profile_check_name = "evalDataflowProfiles" dataflow_profile_check_desc = "Evaluate Dataflow Profiles" dataflow_target = "dataflows.connectivity.iotoperations.azure.com" dataflow_endpoint_target = "dataflowendpoints.connectivity.iotoperations.azure.com" dataflow_profile_target = "dataflowprofiles.connectivity.iotoperations.azure.com" valid_source_endpoint_types = [DataflowEndpointType.kafka.value, DataflowEndpointType.mqtt.value] def _process_dataflow_resource_status( check_manager: CheckManager, target_name: str, namespace: str, status: dict, resource_name: str, resource_kind: str, detail_level: int, padding: int, ): # status is overall optional, no evals if no status if not status: return inner_padding = padding + PADDING_SIZE provisioning_status_enum = CheckTaskStatus.skipped runtime_status_enum = CheckTaskStatus.skipped # provisioning Status (always optional) provisioning_status = status.get("provisioningStatus") if provisioning_status: # status - summary provisioning_status_status = provisioning_status.get("status") # override enum value with errors if they exist provisioning_status_error = provisioning_status.get("error") provisioning_status_failure_cause = provisioning_status.get("failureCause") provisioning_status_enum = ( CheckTaskStatus.error if provisioning_status_error or provisioning_status_failure_cause else ResourceState.map_to_status(provisioning_status_status) ) provisioning_status_display = f"Provisioning Status: {{{colorize_string(color=provisioning_status_enum.color, value=provisioning_status_status)}}}" # if not summary, display output / message if detail_level > ResourceOutputDetailLevel.summary.value: provisioning_status_output = provisioning_status.get("output", {}) # output object has no schema, but expecting "message" property to have details provisioning_status_output_message = provisioning_status_output.get("message") # if no "message" property, just use the entire object as output provisioning_status_output_display = provisioning_status_output_message or provisioning_status_output provisioning_status_display += f" - {colorize_string(provisioning_status_output_display)}" # status display check_manager.add_display( target_name=target_name, namespace=namespace, display=Padding(provisioning_status_display, (0, 0, 0, padding)), ) # display error, failure cause if provisioning_status_error: message = provisioning_status_error.get("message") code = provisioning_status_error.get("code") error_text = f"{code}: {message}" error_display = f"Error: {{{colorize_string(color=CheckTaskStatus.error.color, value=error_text)}}}" check_manager.add_display( target_name=target_name, namespace=namespace, display=Padding(error_display, (0, 0, 0, padding)) ) # show failure cause if provisioning_status_failure_cause: check_manager.add_display( target_name=target_name, namespace=namespace, display=basic_property_display( label="Failure Cause", value=provisioning_status_failure_cause, padding=inner_padding ), ) # logErrors on detail level 1 provisioning_status_log_errors = provisioning_status.get("logErrors") if detail_level > ResourceOutputDetailLevel.summary.value and provisioning_status_log_errors: check_manager.add_display( target_name=target_name, namespace=namespace, display=Padding("Log Errors:", (0, 0, 0, inner_padding)), ) # runtime status (required for profiles) runtime_status = status.get("runtimeStatus", {}) if runtime_status: # calculate status runtime_level = runtime_status.get("level") runtime_status_enum = ResourceState.map_to_status(runtime_level) description = runtime_status.get("description") # create display runtime_status_display = ( f"Runtime Status: {{{colorize_string(color=runtime_status_enum.color, value=runtime_status.get('level'))}}}" ) if description and detail_level > ResourceOutputDetailLevel.summary.value: runtime_status_display += f" - {colorize_string(description)}" check_manager.add_display( target_name=target_name, namespace=namespace, display=Padding(runtime_status_display, (0, 0, 0, padding)), ) elif resource_kind == DataflowResourceKinds.DATAFLOWPROFILE.value: runtime_status_enum = CheckTaskStatus.error # add evals if necessary if provisioning_status_enum != CheckTaskStatus.skipped: # todo - add error object to eval value? status_obj = {"status.provisioningStatus.status": provisioning_status_status} if provisioning_status_error: status_obj["status.provisioningStatus.error"] = provisioning_status_error check_manager.add_target_eval( target_name=target_name, namespace=namespace, status=provisioning_status_enum.value, resource_name=resource_name, resource_kind=resource_kind, value=status_obj, ) if runtime_status_enum != CheckTaskStatus.skipped: check_manager.add_target_eval( target_name=target_name, namespace=namespace, status=runtime_status_enum.value, resource_name=resource_name, resource_kind=resource_kind, value={"status.runtimeStatus.level": runtime_status.get("level")}, ) def _process_dataflow_sourcesettings( check_manager: CheckManager, target: str, namespace: str, dataflow_name: str, endpoints: List[dict], operation: dict, detail_level: int, padding: int, ): inner_padding = padding + PADDING_SIZE settings = operation.get("sourceSettings", {}) # show endpoint ref # TODO - lots of shared code for validating source/dest endpoints, consider refactoring endpoint_ref = settings.get("endpointRef") # currently we are only looking for endpoint references in the same namespace # duplicate names should not exist, so check the first endpoint that matches the name ref endpoint_ref_string = "not found" endpoint_ref_status = endpoint_type_status = CheckTaskStatus.error endpoint_type_status_string = "invalid" found_endpoint = next( (endpoint for endpoint in endpoints if "name" in endpoint and endpoint["name"] == endpoint_ref), None ) endpoint_type = found_endpoint["type"] if found_endpoint and "type" in found_endpoint else None if found_endpoint: endpoint_ref_status = CheckTaskStatus.success endpoint_ref_string = "detected" endpoint_type_valid = endpoint_type and endpoint_type.lower() in valid_source_endpoint_types endpoint_type_status = CheckTaskStatus.success if endpoint_type_valid else CheckTaskStatus.error endpoint_type_status_string = "valid" if endpoint_type_valid else f"has invalid type: {endpoint_type}" endpoint_ref_display = colorize_string(value=endpoint_ref_string, color=endpoint_ref_status.color) endpoint_validity_display = colorize_string( color=endpoint_type_status.color, value=endpoint_type_status_string ) # valid endpoint ref eval check_manager.add_target_eval( target_name=target, namespace=namespace, status=endpoint_ref_status.value, resource_name=dataflow_name, resource_kind=DataflowResourceKinds.DATAFLOW.value, value={"spec.operations[*].sourceSettings.endpointRef": endpoint_ref}, ) # valid source endpoint type eval check_manager.add_target_eval( target_name=target, namespace=namespace, status=endpoint_type_status.value, resource_name=dataflow_name, resource_kind=DataflowResourceKinds.DATAFLOW.value, value={"ref(spec.operations[*].sourceSettings.endpointRef).endpointType": endpoint_type}, ) if detail_level > ResourceOutputDetailLevel.summary.value: check_manager.add_display( target_name=target, namespace=namespace, display=Padding("\nSource:", (0, 0, 0, padding)) ) endpoint_name_display = f"{{{colorize_string(value=endpoint_ref)}}}" check_manager.add_display( target_name=target, namespace=namespace, display=Padding( f"Dataflow Endpoint {endpoint_name_display} {endpoint_ref_display}, {endpoint_validity_display}", (0, 0, 0, padding + PADDING_SIZE), ), ) elif not found_endpoint or not endpoint_type_valid: check_manager.add_display( target_name=target, namespace=namespace, display=Padding( "[red]Invalid source endpoint reference[/red]", (0, 0, 0, padding - PADDING_SIZE) ), ) if detail_level > ResourceOutputDetailLevel.detail.value: for label, key in [ # TODO - validate asset ref / colorize ("DeviceRegistry Asset Reference", "assetRef"), ("Schema Reference", "schemaRef"), ("Serialization Format", "serializationFormat"), ]: val = settings.get(key) if val: check_manager.add_display( target_name=target, namespace=namespace, display=basic_property_display(label=label, value=val, padding=inner_padding), ) # data source strings - not on summary if detail_level > ResourceOutputDetailLevel.summary.value: data_sources = settings.get("dataSources", []) if data_sources: check_manager.add_display( target_name=target, namespace=namespace, display=Padding("Data Sources:", (0, 0, 0, inner_padding)), ) for data_source in data_sources: check_manager.add_display( target_name=target, namespace=namespace, display=Padding(f"- {colorize_string(data_source)}", (0, 0, 0, inner_padding + 2)), ) def _process_dataflow_transformationsettings( check_manager: CheckManager, target: str, namespace: str, resource: dict, detail_level: int, padding: int ): settings = resource.get("builtInTransformationSettings", {}) # only show details on non-summary if detail_level > ResourceOutputDetailLevel.summary.value: check_manager.add_display( target_name=target, namespace=namespace, display=Padding("\nBuilt-In Transformation:", (0, 0, 0, padding)), ) padding += PADDING_SIZE inner_padding = padding + PADDING_SIZE def _process_inputs(inputs: List[str]): if inputs: check_manager.add_display( target_name=target, namespace=namespace, display=Padding("Inputs:", (0, 0, 0, inner_padding)), ) for input in inputs: check_manager.add_display( target_name=target, namespace=namespace, display=Padding(f"- {colorize_string(input)}", (0, 0, 0, inner_padding + 2)), ) # extra properties for datasets_label, key in [ ("Schema Reference", "schemaRef"), ("Serialization Format", "serializationFormat"), ]: val = settings.get(key) if val: check_manager.add_display( target_name=target, namespace=namespace, display=basic_property_display(label=datasets_label, value=val, padding=padding), ) # only show datasets, filters, maps on verbose if detail_level > ResourceOutputDetailLevel.detail.value: # datasets datasets = settings.get("datasets", []) if datasets: check_manager.add_display( target_name=target, namespace=namespace, display=Padding("Datasets:", (0, 0, 0, padding)) ) for dataset in datasets: for label, key in [ ("Description", "description"), ("Key", "key"), ("Expression", "expression"), ("Schema", "schemaRef"), ]: val = dataset.get(key) if val: check_manager.add_display( target_name=target, namespace=namespace, display=basic_property_display(label=label, value=val, padding=inner_padding), ) inputs = dataset.get("inputs", []) _process_inputs(inputs) # filters filters = settings.get("filter", []) if filters: check_manager.add_display( target_name=target, namespace=namespace, display=Padding("Filters:", (0, 0, 0, padding)) ) for filter in filters: for datasets_label, key in [ ("Description", "description"), ("Expression", "expression"), ("Operation Type", "type"), ]: val = filter.get(key) if val: check_manager.add_display( target_name=target, namespace=namespace, display=basic_property_display(label=datasets_label, value=val, padding=padding), ) inputs = filter.get("inputs", []) _process_inputs(inputs) # maps maps = settings.get("map", []) if maps: check_manager.add_display( target_name=target, namespace=namespace, display=Padding("Maps:", (0, 0, 0, padding)) ) for map in maps: for label, key in [ ("Description", "description"), ("Expression", "expression"), ("Output", "output"), ("Transformation Type", "type"), ]: val = map.get(key) if val: check_manager.add_display( target_name=target, namespace=namespace, display=basic_property_display(label=label, value=val, padding=inner_padding), ) inputs = map.get("inputs", []) _process_inputs(inputs) def _process_dataflow_destinationsettings( check_manager: CheckManager, target: str, namespace: str, dataflow_name: str, endpoints: List[dict], operation: dict, detail_level: int, padding: int, ): settings = operation.get("destinationSettings", {}) if detail_level > ResourceOutputDetailLevel.summary.value: check_manager.add_display( target_name=target, namespace=namespace, display=Padding("\nDestination:", (0, 0, 0, padding)) ) endpoint_ref = settings.get("endpointRef") # currently we are only looking for endpoint references in the same namespace # duplicate names should not exist, so check the first endpoint that matches the name ref endpoint_match = next( (endpoint for endpoint in endpoints if "name" in endpoint and endpoint["name"] == endpoint_ref), None ) endpoint_validity = "valid" endpoint_status = CheckTaskStatus.success if not endpoint_match: endpoint_validity = "not found" endpoint_status = CheckTaskStatus.error # valid endpoint ref eval check_manager.add_target_eval( target_name=target, namespace=namespace, status=endpoint_status.value, resource_name=dataflow_name, resource_kind=DataflowResourceKinds.DATAFLOW.value, value={"spec.operations[*].destinationSettings.endpointRef": endpoint_ref}, ) # show dataflow endpoint ref on detail if detail_level > ResourceOutputDetailLevel.summary.value: padding += PADDING_SIZE endpoint_name_display = f"{{{colorize_string(value=endpoint_ref)}}}" endpoint_validity_display = colorize_string(color=endpoint_status.color, value=endpoint_validity) check_manager.add_display( target_name=target, namespace=namespace, display=Padding( f"Dataflow Endpoint {endpoint_name_display} {endpoint_validity_display}", (0, 0, 0, padding), ), ) elif not endpoint_match: check_manager.add_display( target_name=target, namespace=namespace, display=Padding( "[red]Invalid destination endpoint reference[/red]", (0, 0, 0, padding - PADDING_SIZE) ), ) # only show destination on verbose if detail_level > ResourceOutputDetailLevel.detail.value: for label, key in [ ("Data Destination", "dataDestination"), ]: val = settings.get(key) if val: check_manager.add_display( target_name=target, namespace=namespace, display=basic_property_display(label=label, value=val, padding=padding), ) def _process_endpoint_authentication( endpoint_settings: dict, check_manager: CheckManager, target: str, namespace: str, padding: int, detail_level: int ) -> None: auth_property_dict = { DataFlowEndpointAuthenticationType.access_token.value: { "key": "accessTokenSettings", "displays": [ ("Secret Reference", "secretRef"), ], }, DataFlowEndpointAuthenticationType.system_assigned.value: { "key": "systemAssignedManagedIdentitySettings", "displays": [ ("Audience", "audience"), ], }, DataFlowEndpointAuthenticationType.user_assigned.value: { "key": "userAssignedManagedIdentitySettings", "displays": [ ("Client ID", "clientId"), ("Scope", "scope"), ("Tenant ID", "tenantId"), ], }, DataFlowEndpointAuthenticationType.x509.value: { "key": "x509CertificateSettings", "displays": [ ("Secret Ref", "secretRef"), ], }, DataFlowEndpointAuthenticationType.service_account_token.value: { "key": "serviceAccountTokenSettings", "displays": [ ("Audience", "audience"), ], }, DataFlowEndpointAuthenticationType.sasl.value: { "key": "saslSettings", "displays": [ ("Type", "saslType"), ("Secret Ref", "secretRef"), ], }, DataFlowEndpointAuthenticationType.anonymous.value: { "key": "anonymousSettings", "displays": [], }, } auth: dict = endpoint_settings.get("authentication", {}) auth_method = auth.get("method") # display unkown auth method if auth_method not in auth_property_dict: check_manager.add_display( target_name=target, namespace=namespace, display=Padding(f"[red]Unknown authentication method: {auth_method}", (0, 0, 0, padding)), ) return # display auth method check_manager.add_display( target_name=target, namespace=namespace, display=basic_property_display(label="Authentication Method", value=auth_method, padding=padding), ) # show details for various auth methods if detail_level > ResourceOutputDetailLevel.detail.value: auth_properties: dict = auth_property_dict.get(auth_method, {}) auth_settings_key = auth_properties.get("key") auth_obj = auth.get(auth_settings_key) if auth_obj: for label, key in auth_properties.get("displays", []): val = auth_obj.get(key) if val: check_manager.add_display( target_name=target, namespace=namespace, display=basic_property_display(label=label, value=val, padding=padding + PADDING_SIZE), ) def _process_endpoint_TLS( tls_settings: dict, check_manager: CheckManager, target: str, namespace: str, padding: int ) -> None: check_manager.add_display( target_name=target, namespace=namespace, display=Padding("TLS:", (0, 0, 0, padding)), ) for label, key in [ ("Mode", "mode"), ("Trusted CA ConfigMap", "trustedCaCertificateConfigMapRef"), ]: # TODO - validate ref? val = tls_settings.get(key) if val: check_manager.add_display( target_name=target, namespace=namespace, display=basic_property_display(label=label, value=val, padding=(padding + PADDING_SIZE)), ) def _process_endpoint_mqttsettings( check_manager: CheckManager, target: str, namespace: str, spec: dict, detail_level: int, padding: int ) -> None: settings = spec.get("mqttSettings", {}) for label, key in [ ("MQTT Host", "host"), ("Protocol", "protocol"), ]: val = settings.get(key) if val: check_manager.add_display( target_name=target, namespace=namespace, display=basic_property_display(label=label, value=val, padding=padding), ) # endpoint authentication details _process_endpoint_authentication( endpoint_settings=settings, check_manager=check_manager, target=target, namespace=namespace, padding=INNER_PADDING, detail_level=detail_level, ) if detail_level > ResourceOutputDetailLevel.detail.value: for label, key in [ ("Cloud Event Attributes", "cloudEventAttributes"), ("Client ID Prefix", "clientIdPrefix"), ("Keep Alive (s)", "keepAliveSeconds"), ("Max Inflight Messages", "maxInflightMessages"), ("QOS", "qos"), ("Retain", "retain"), ("Session Expiry (s)", "sessionExpirySeconds"), ]: val = settings.get(key) if val: check_manager.add_display( target_name=target, namespace=namespace, display=basic_property_display(label=label, value=val, padding=padding), ) # TLS tls = settings.get("tls", {}) if tls: _process_endpoint_TLS( tls_settings=tls, check_manager=check_manager, target=target, namespace=namespace, padding=padding, ) def _process_endpoint_kafkasettings( check_manager: CheckManager, target: str, namespace: str, spec: dict, detail_level: int, padding: int ) -> None: inner_padding = padding + PADDING_SIZE settings = spec.get("kafkaSettings", {}) for label, key in [ ("Kafka Host", "host"), ("Consumer Group ID", "consumerGroupId"), ]: val = settings.get(key) if val: check_manager.add_display( target_name=target, namespace=namespace, display=basic_property_display(label=label, value=val, padding=padding), ) # endpoint authentication details _process_endpoint_authentication( endpoint_settings=settings, check_manager=check_manager, target=target, namespace=namespace, padding=INNER_PADDING, detail_level=detail_level, ) if detail_level > ResourceOutputDetailLevel.detail.value: # extra properties for label, key in [ ("Cloud Event Attributes", "cloudEventAttributes"), ("Compression", "compression"), ("Copy MQTT Properties", "copyMqttProperties"), ("Acks", "kafkaAcks"), ("Partition Strategy", "partitionStrategy"), ]: val = settings.get(key) if val: check_manager.add_display( target_name=target, namespace=namespace, display=basic_property_display(label=label, value=val, padding=padding), ) # TLS tls = settings.get("tls", {}) if tls: _process_endpoint_TLS( tls_settings=tls, check_manager=check_manager, target=target, namespace=namespace, padding=padding, ) # batching batching = settings.get("batching", {}) check_manager.add_display( target_name=target, namespace=namespace, display=Padding("Batching:", (0, 0, 0, padding)), ) for label, key in [ ("Latency (ms)", "latencyMs"), ("Max Bytes", "maxBytes"), ("Max Messages", "maxMessages"), ("Mode", "mode"), ]: val = batching.get(key) if val: check_manager.add_display( target_name=target, namespace=namespace, display=basic_property_display(label=label, value=val, padding=inner_padding), ) def _process_endpoint_fabriconelakesettings( check_manager: CheckManager, target: str, namespace: str, spec: dict, detail_level: int, padding: int ) -> None: settings = spec.get("fabricOneLakeSettings", {}) for label, key in [("Fabric Host", "host"), ("Path Type", "oneLakePathType")]: val = settings.get(key) if val: check_manager.add_display( target_name=target, namespace=namespace, display=basic_property_display(label=label, value=val, padding=padding), ) # endpoint authentication details _process_endpoint_authentication( endpoint_settings=settings, check_manager=check_manager, target=target, namespace=namespace, padding=INNER_PADDING, detail_level=detail_level, ) if detail_level > ResourceOutputDetailLevel.detail.value: names = settings.get("names", {}) for label, key in [ ("Lakehouse Name", "lakehouseName"), ("Workspace Name", "workspaceName"), ]: val = names.get(key) if val: check_manager.add_display( target_name=target, namespace=namespace, display=basic_property_display(label=label, value=val, padding=padding), ) batching = settings.get("batching", {}) check_manager.add_display( target_name=target, namespace=namespace, display=Padding("Batching:", (0, 0, 0, padding)), ) padding += PADDING_SIZE for label, key in [ ("Latency (s)", "latencySeconds"), ("Max Messages", "maxMessages"), ]: val = batching.get(key) if val: check_manager.add_display( target_name=target, namespace=namespace, display=basic_property_display(label=label, value=val, padding=padding), ) def _process_endpoint_datalakestoragesettings( check_manager: CheckManager, target: str, namespace: str, spec: dict, detail_level: int, padding: int ) -> None: settings = spec.get("datalakeStorageSettings", {}) for label, key in [("DataLake Host", "host")]: val = settings.get(key) if val: check_manager.add_display( target_name=target, namespace=namespace, display=basic_property_display(label=label, value=val, padding=padding), ) # endpoint authentication details _process_endpoint_authentication( endpoint_settings=settings, check_manager=check_manager, target=target, namespace=namespace, padding=INNER_PADDING, detail_level=detail_level, ) if detail_level > ResourceOutputDetailLevel.detail.value: batching = settings.get("batching", {}) check_manager.add_display( target_name=target, namespace=namespace, display=Padding("Batching:", (0, 0, 0, padding)), ) padding += PADDING_SIZE for label, key in [ ("Latency (s)", "latencySeconds"), ("Max Messages", "maxMessages"), ]: val = batching.get(key) if val: check_manager.add_display( target_name=target, namespace=namespace, display=basic_property_display(label=label, value=val, padding=padding), ) def _process_endpoint_dataexplorersettings( check_manager: CheckManager, target: str, namespace: str, spec: dict, detail_level: int, padding: int ) -> None: settings = spec.get("dataExplorerSettings", {}) for label, key in [("Database Name", "database"), ("Data Explorer Host", "host")]: val = settings.get(key) if val: check_manager.add_display( target_name=target, namespace=namespace, display=basic_property_display(label=label, value=val, padding=padding), ) # endpoint authentication details _process_endpoint_authentication( endpoint_settings=settings, check_manager=check_manager, target=target, namespace=namespace, padding=INNER_PADDING, detail_level=detail_level, ) if detail_level > ResourceOutputDetailLevel.detail.value: batching = settings.get("batching", {}) check_manager.add_display( target_name=target, namespace=namespace, display=Padding("Batching:", (0, 0, 0, padding)), ) padding += PADDING_SIZE for label, key in [ ("Latency (s)", "latencySeconds"), ("Max Messages", "maxMessages"), ]: val = batching.get(key) if val: check_manager.add_display( target_name=target, namespace=namespace, display=basic_property_display(label=label, value=val, padding=padding), ) def _process_endpoint_localstoragesettings( check_manager: CheckManager, target: str, namespace: str, spec: dict, detail_level: int, padding: int ) -> None: # TODO - validate reference settings = spec.get("localStorageSettings", {}) persistent_volume_claim = settings.get("persistentVolumeClaimRef") check_manager.add_display( target_name=target, namespace=namespace, display=Padding(f"Persistent Volume Claim: {persistent_volume_claim}", (0, 0, 0, padding)), ) # endpoint authentication details _process_endpoint_authentication( endpoint_settings=settings, check_manager=check_manager, target=target, namespace=namespace, padding=INNER_PADDING, detail_level=detail_level, ) def check_dataflows_deployment( as_list: bool = False, detail_level: int = ResourceOutputDetailLevel.summary.value, resource_kinds: List[str] = None, resource_name: str = None, ) -> List[dict]: evaluate_funcs = { CoreServiceResourceKinds.RUNTIME_RESOURCE: evaluate_core_service_runtime, DataflowResourceKinds.DATAFLOWPROFILE: evaluate_dataflow_profiles, DataflowResourceKinds.DATAFLOWENDPOINT: evaluate_dataflow_endpoints, DataflowResourceKinds.DATAFLOW: evaluate_dataflows, } return check_post_deployment( api_info=DATAFLOW_API_V1, check_name=dataflow_api_check_name, check_desc=dataflow_api_check_desc, evaluate_funcs=evaluate_funcs, as_list=as_list, detail_level=detail_level, resource_kinds=resource_kinds, resource_name=resource_name, ) def evaluate_core_service_runtime( as_list: bool = False, detail_level: int = ResourceOutputDetailLevel.summary.value, resource_name: str = None, ): check_manager = CheckManager( check_name=dataflow_runtime_check_name, check_desc=dataflow_runtime_check_desc, ) operators = get_namespaced_pods_by_prefix( prefix=DATAFLOW_OPERATOR_PREFIX, namespace="", label_selector=DATAFLOW_NAME_LABEL, ) if resource_name: operators = filter_resources_by_name( resources=operators, resource_name=resource_name, ) if not operators: check_manager.add_target(target_name=CoreServiceResourceKinds.RUNTIME_RESOURCE.value) check_manager.add_display( target_name=CoreServiceResourceKinds.RUNTIME_RESOURCE.value, display=Padding("Unable to fetch pods.", (0, 0, 0, PADDING)), ) for namespace, pods in get_resources_grouped_by_namespace(operators): check_manager.add_target( target_name=CoreServiceResourceKinds.RUNTIME_RESOURCE.value, namespace=namespace, ) check_manager.add_display( target_name=CoreServiceResourceKinds.RUNTIME_RESOURCE.value, namespace=namespace, display=Padding( f"Dataflow operator in namespace {{[purple]{namespace}[/purple]}}", (0, 0, 0, PADDING), ), ) evaluate_pod_health( check_manager=check_manager, target=CoreServiceResourceKinds.RUNTIME_RESOURCE.value, pods=pods, namespace=namespace, padding=PADDING + 2, detail_level=detail_level, ) return check_manager.as_dict(as_list) def evaluate_dataflows( as_list: bool = False, detail_level: int = ResourceOutputDetailLevel.summary.value, resource_name: str = None, ): check_manager = CheckManager( check_name=dataflows_check_name, check_desc=dataflows_check_desc, ) all_dataflows = get_resources_by_name( api_info=DATAFLOW_API_V1, kind=DataflowResourceKinds.DATAFLOW, resource_name=resource_name, ) target = dataflow_target # No dataflows - skip if not all_dataflows: no_dataflows_text = "No Dataflow resources detected in any namespace." check_manager.add_target(target_name=target) check_manager.add_target_eval( target_name=target, status=CheckTaskStatus.skipped.value, value={"dataflows": no_dataflows_text} ) check_manager.add_display( target_name=target, display=Padding(no_dataflows_text, (0, 0, 0, PADDING)), ) return check_manager.as_dict(as_list=as_list) for namespace, dataflows in get_resources_grouped_by_namespace(all_dataflows): check_manager.add_target(target_name=target, namespace=namespace) check_manager.add_display( target_name=target, namespace=namespace, display=Padding(f"Dataflows in namespace {{[purple]{namespace}[/purple]}}", (0, 0, 0, PADDING)), ) # conditions check_manager.add_target_conditions( target_name=target, namespace=namespace, conditions=[ # at least a source and destination operation "len(spec.operations)<=3", # valid source endpoint "spec.operations[*].sourceSettings.endpointRef", "ref(spec.operations[*].sourceSettings.endpointRef).endpointType in ('kafka','mqtt')", # valid destination endpoint "spec.operations[*].destinationSettings.endpointRef", # single source/destination "len(spec.operations[*].sourceSettings)==1", "len(spec.operations[*].destinationSettings)==1", ], ) # profile names for reference lookup all_profiles = get_resources_by_name( api_info=DATAFLOW_API_V1, kind=DataflowResourceKinds.DATAFLOWPROFILE, namespace=namespace, resource_name=None, ) profile_names = {profile.get("metadata", {}).get("name") for profile in all_profiles} all_endpoints = get_resources_by_name( api_info=DATAFLOW_API_V1, kind=DataflowResourceKinds.DATAFLOWENDPOINT, namespace=namespace, resource_name=None, ) endpoints = [ { "name": endpoint.get("metadata", {}).get("name"), "type": endpoint.get("spec", {}).get("endpointType"), } for endpoint in all_endpoints ] for dataflow in list(dataflows): spec = dataflow.get("spec", {}) dataflow_name = dataflow.get("metadata", {}).get("name") mode = spec.get("mode") mode_lower = str(mode).lower() if mode else "unknown" dataflow_enabled = mode_lower == "enabled" mode_display = colorize_string(value=mode_lower) check_manager.add_display( target_name=target, namespace=namespace, display=Padding( f"\n- Dataflow {{{colorize_string(value=dataflow_name)}}} is {mode_display}", (0, 0, 0, PADDING), ), ) # if dataflow is disabled, skip evaluations and displays if not dataflow_enabled: check_manager.add_target_eval( target_name=target, namespace=namespace, status=CheckTaskStatus.skipped.value, resource_name=dataflow_name, resource_kind=DataflowResourceKinds.DATAFLOW.value, value={"spec.mode": mode}, ) check_manager.add_display( target_name=target, namespace=namespace, display=Padding( colorize_string( value=f"{CheckTaskStatus.skipped.emoji} Skipping evaluation of disabled dataflow", color=CheckTaskStatus.skipped.color, ), (0, 0, 0, PADDING + 2), ), ) continue status = dataflow.get("status", {}) _process_dataflow_resource_status( check_manager=check_manager, target_name=target, namespace=namespace, status=status, resource_name=dataflow_name, resource_kind=DataflowResourceKinds.DATAFLOW.value, detail_level=detail_level, padding=INNER_PADDING, ) profile_ref = spec.get("profileRef") # profileRef is optional, only show an error if it exists but is invalid if profile_ref: profile_ref_status = ( CheckTaskStatus.error if profile_ref not in profile_names else CheckTaskStatus.success ) # valid profileRef eval check_manager.add_target_eval( target_name=target, namespace=namespace, status=profile_ref_status.value, resource_name=dataflow_name, resource_kind=DataflowResourceKinds.DATAFLOW.value, value={"spec.profileRef": profile_ref}, ) check_manager.add_display( target_name=target, namespace=namespace, display=Padding( f"Dataflow Profile: {{{colorize_string(color=profile_ref_status.color, value=profile_ref)}}}", (0, 0, 0, INNER_PADDING), ), ) if profile_ref_status == CheckTaskStatus.error: check_manager.add_display( target_name=target, namespace=namespace, display=Padding( colorize_string( color=profile_ref_status.color, value="Invalid Dataflow Profile reference" ), (0, 0, 0, INNER_PADDING), ), ) operations = spec.get("operations", []) # check operations count operations_status = CheckTaskStatus.success.value if not operations or not (2 <= len(operations) <= 3): operations_status = CheckTaskStatus.error.value check_manager.add_target_eval( target_name=target, namespace=namespace, status=operations_status, resource_name=dataflow_name, resource_kind=DataflowResourceKinds.DATAFLOW.value, value={"len(operations)": len(operations)}, ) if operations and detail_level > ResourceOutputDetailLevel.summary.value: check_manager.add_display( target_name=target, namespace=namespace, display=Padding("Operations:", (0, 0, 0, INNER_PADDING)), ) operation_padding = INNER_PADDING + PADDING_SIZE sources = destinations = 0 for operation in operations: op_type = operation.get("operationType", "").lower() if op_type == DataflowOperationType.source.value: sources += 1 _process_dataflow_sourcesettings( check_manager=check_manager, target=target, namespace=namespace, dataflow_name=dataflow_name, endpoints=endpoints, operation=operation, detail_level=detail_level, padding=operation_padding, ) elif op_type == DataflowOperationType.builtin_transformation.value: _process_dataflow_transformationsettings( check_manager=check_manager, target=target, namespace=namespace, resource=operation, detail_level=detail_level, padding=operation_padding, ) elif op_type == DataflowOperationType.destination.value: destinations += 1 _process_dataflow_destinationsettings( check_manager=check_manager, target=target, namespace=namespace, dataflow_name=dataflow_name, endpoints=endpoints, operation=operation, detail_level=detail_level, padding=operation_padding, ) # eval source amount (1) sources_status = destinations_status = CheckTaskStatus.success.value if sources != 1: sources_status = CheckTaskStatus.error.value message = ( "Missing source operation" if sources == 0 else f"Too many source operations: {sources}" ) check_manager.add_display( target_name=target, namespace=namespace, display=Padding(f"[red]{message}[/red]", (0, 0, 0, INNER_PADDING)), ) check_manager.add_target_eval( target_name=target, namespace=namespace, status=sources_status, resource_name=dataflow_name, resource_kind=DataflowResourceKinds.DATAFLOW.value, value={"len(spec.operations[*].sourceSettings)": sources}, ) if destinations != 1: destinations_status = CheckTaskStatus.error.value message = ( "Missing destination operation" if destinations == 0 else f"Too many destination operations: {destinations}" ) check_manager.add_display( target_name=target, namespace=namespace, display=Padding(f"[red]{message}[/red]", (0, 0, 0, INNER_PADDING)), ) check_manager.add_target_eval( target_name=target, namespace=namespace, status=destinations_status, resource_name=dataflow_name, resource_kind=DataflowResourceKinds.DATAFLOW.value, value={"len(spec.operations[*].destinationSettings)": destinations}, ) return check_manager.as_dict(as_list=as_list) def evaluate_dataflow_endpoints( as_list: bool = False, detail_level: int = ResourceOutputDetailLevel.summary.value, resource_name: str = None, ): check_manager = CheckManager( check_name=dataflow_endpoint_check_name, check_desc=dataflow_endpoint_check_desc, ) all_endpoints = get_resources_by_name( api_info=DATAFLOW_API_V1, kind=DataflowResourceKinds.DATAFLOWENDPOINT, resource_name=resource_name, ) target = dataflow_endpoint_target if not all_endpoints: no_endpoints_text = "No Dataflow Endpoints detected in any namespace." check_manager.add_target(target_name=target) check_manager.add_target_eval( target_name=target, status=CheckTaskStatus.skipped.value, value={"endpoints": no_endpoints_text} ) check_manager.add_display( target_name=target, display=Padding(no_endpoints_text, (0, 0, 0, PADDING)), ) return check_manager.as_dict(as_list=as_list) for namespace, endpoints in get_resources_grouped_by_namespace(all_endpoints): check_manager.add_target(target_name=target, namespace=namespace, conditions=["spec.endpointType"]) check_manager.add_display( target_name=target, namespace=namespace, display=Padding( f"Dataflow Endpoints in namespace {{[purple]{namespace}[/purple]}}", (0, 0, 0, PADDING) ), ) for endpoint in list(endpoints): spec = endpoint.get("spec", {}) endpoint_name = endpoint.get("metadata", {}).get("name") endpoint_type = spec.get("endpointType") valid_endpoint_type = endpoint_type and endpoint_type.lower() in DataflowEndpointType.list() check_manager.add_target_eval( target_name=target, namespace=namespace, status=CheckTaskStatus.success.value if valid_endpoint_type else CheckTaskStatus.error.value, resource_name=endpoint_name, resource_kind=DataflowResourceKinds.DATAFLOWENDPOINT.value, value={"spec.endpointType": endpoint_type}, ) endpoint_string = f"Endpoint {{{colorize_string(value=endpoint_name)}}}" detected_string = colorize_string(color="green", value="detected") type_string = f"type: {colorize_string(color=DEFAULT_PROPERTY_DISPLAY_COLOR if valid_endpoint_type else 'red', value=endpoint_type)}" check_manager.add_display( target_name=target, namespace=namespace, display=Padding( f"\n- {endpoint_string} {detected_string}, {type_string}", (0, 0, 0, PADDING), ), ) status = endpoint.get("status", {}) _process_dataflow_resource_status( check_manager=check_manager, target_name=target, namespace=namespace, status=status, resource_name=endpoint_name, resource_kind=DataflowResourceKinds.DATAFLOWENDPOINT.value, detail_level=detail_level, padding=INNER_PADDING, ) # endpoint details at higher detail levels if detail_level > ResourceOutputDetailLevel.summary.value: endpoint_processor_dict = { DataflowEndpointType.mqtt.value: _process_endpoint_mqttsettings, DataflowEndpointType.kafka.value: _process_endpoint_kafkasettings, DataflowEndpointType.fabric_onelake.value: _process_endpoint_fabriconelakesettings, DataflowEndpointType.datalake.value: _process_endpoint_datalakestoragesettings, DataflowEndpointType.data_explorer.value: _process_endpoint_dataexplorersettings, DataflowEndpointType.local_storage.value: _process_endpoint_localstoragesettings, } # process endpoint settings if endpoint_type and endpoint_type.lower() in endpoint_processor_dict: endpoint_processor_dict[endpoint_type.lower()]( check_manager=check_manager, target=target, namespace=namespace, spec=spec, detail_level=detail_level, padding=INNER_PADDING, ) else: check_manager.add_display( target_name=target, namespace=namespace, display=Padding( colorize_string(color="red", value=f"Unknown endpoint type: {endpoint_type}"), (0, 0, 0, INNER_PADDING), ), ) return check_manager.as_dict(as_list=as_list) def evaluate_dataflow_profiles( as_list: bool = False, detail_level: int = ResourceOutputDetailLevel.summary.value, resource_name: str = None, ): check_manager = CheckManager( check_name=dataflow_profile_check_name, check_desc=dataflow_profile_check_desc, ) target = dataflow_profile_target all_profiles = get_resources_by_name( api_info=DATAFLOW_API_V1, kind=DataflowResourceKinds.DATAFLOWPROFILE, resource_name=resource_name, ) if not all_profiles: no_profiles_text = "No Dataflow Profiles detected in any namespace." check_manager.add_target(target_name=target) # if we may have manually filtered out the default profile by input, skip instead of warn default_profile_status = CheckTaskStatus.skipped if resource_name else CheckTaskStatus.warning check_manager.add_target_eval( target_name=target, status=default_profile_status.value, value={"profiles": no_profiles_text} ) check_manager.add_display( target_name=target, display=Padding(no_profiles_text, (0, 0, 0, PADDING)), ) return check_manager.as_dict(as_list=as_list) for namespace, profiles in get_resources_grouped_by_namespace(all_profiles): check_manager.add_target( target_name=target, namespace=namespace, conditions=["spec.instanceCount", f"[*].metadata.name=='{DEFAULT_DATAFLOW_PROFILE}'"], ) check_manager.add_display( target_name=target, namespace=namespace, display=Padding( f"Dataflow Profiles in namespace {{[purple]{namespace}[/purple]}}", (0, 0, 0, PADDING) ), ) # warn if no default dataflow profile (unless possibly filtered) default_profile_status = CheckTaskStatus.skipped if resource_name else CheckTaskStatus.warning for profile in list(profiles): profile_name = profile.get("metadata", {}).get("name") # check for default dataflow profile if profile_name == DEFAULT_DATAFLOW_PROFILE: default_profile_status = CheckTaskStatus.success spec = profile.get("spec", {}) check_manager.add_display( target_name=target, namespace=namespace, display=Padding( f"\n- Profile {{{colorize_string(value=profile_name)}}} {colorize_string(color='green', value='detected')}", (0, 0, 0, PADDING), ), ) # evaluate status status = profile.get("status", {}) _process_dataflow_resource_status( check_manager=check_manager, target_name=target, namespace=namespace, status=status, resource_name=profile_name, resource_kind=DataflowResourceKinds.DATAFLOWPROFILE.value, detail_level=detail_level, padding=INNER_PADDING, ) # instance count instance_count = spec.get("instanceCount") has_instances = instance_count is not None and int(instance_count) >= 0 instance_status = CheckTaskStatus.success if has_instances else CheckTaskStatus.error check_manager.add_target_eval( target_name=target, namespace=namespace, status=instance_status.value, resource_name=profile_name, resource_kind=DataflowResourceKinds.DATAFLOWPROFILE.value, value={"spec.instanceCount": instance_count}, ) if has_instances: check_manager.add_display( target_name=target, namespace=namespace, display=basic_property_display( label="Instance count", value=instance_count, color=instance_status.color, padding=INNER_PADDING, ), ) else: check_manager.add_display( target_name=target, namespace=namespace, display=Padding("[red]No instance count set[/red]", (0, 0, 0, INNER_PADDING)), ) # diagnostics on higher detail levels if detail_level > ResourceOutputDetailLevel.summary.value: log_padding = PADDING + PADDING_SIZE log_inner_padding = log_padding + PADDING_SIZE diagnostics = spec.get("diagnostics", {}) check_manager.add_display( target_name=target, namespace=namespace, display=Padding("Diagnostic Logs:", (0, 0, 0, log_padding)), ) # diagnostic logs diagnostic_logs = diagnostics.get("logs", {}) diagnostic_log_level = diagnostic_logs.get("level") check_manager.add_display( target_name=target, namespace=namespace, display=basic_property_display( label="Log Level", value=diagnostic_log_level, padding=log_inner_padding ), ) if detail_level > ResourceOutputDetailLevel.detail.value: diagnostic_log_otelconfig = diagnostic_logs.get("openTelemetryExportConfig", {}) if diagnostic_log_otelconfig: for label, key in [ ("Endpoint", "otlpGrpcEndpoint"), ("Interval (s)", "intervalSeconds"), ("Level", "level"), ]: val = diagnostic_log_otelconfig.get(key) if val: check_manager.add_display( target_name=target, namespace=namespace, display=basic_property_display( label=label, value=val, padding=log_inner_padding ), ) # diagnostic metrics diagnostic_metrics = diagnostics.get("metrics", {}) check_manager.add_display( target_name=target, namespace=namespace, display=Padding("Diagnostic Metrics:", (0, 0, 0, log_padding)), ) diagnostic_metrics_prometheusPort = diagnostic_metrics.get("prometheusPort") check_manager.add_display( target_name=target, namespace=namespace, display=basic_property_display( label="Prometheus Port", value=diagnostic_metrics_prometheusPort, padding=log_inner_padding, ), ) diagnostic_metrics_otelconfig = diagnostic_metrics.get("openTelemetryExportConfig", {}) if diagnostic_metrics_otelconfig: for label, key in [ ("Endpoint", "otlpGrpcEndpoint"), ("Interval (s)", "intervalSeconds"), ]: val = diagnostic_metrics_otelconfig.get(key) if val: check_manager.add_display( target_name=target, namespace=namespace, display=basic_property_display( label=label, value=val, padding=log_inner_padding ), ) # pod health - trailing `-` is important in case profiles have similar prefixes pod_prefix = f"{DATAFLOW_PROFILE_POD_PREFIX}{profile_name}-" profile_pods = get_namespaced_pods_by_prefix( prefix=pod_prefix, namespace=namespace, label_selector=DATAFLOW_NAME_LABEL, ) # only show pods if they exist if profile_pods: evaluate_pod_health( check_manager=check_manager, target=target, pods=profile_pods, namespace=namespace, padding=INNER_PADDING, detail_level=detail_level, ) # default dataflow profile status, display warning if not success check_manager.add_target_eval( target_name=target, namespace=namespace, status=default_profile_status.value, resource_kind=DataflowResourceKinds.DATAFLOWPROFILE.value, resource_name=DEFAULT_DATAFLOW_PROFILE, value={f"[*].metadata.name=='{DEFAULT_DATAFLOW_PROFILE}'": default_profile_status.value}, ) if default_profile_status not in [CheckTaskStatus.success, CheckTaskStatus.skipped]: check_manager.add_display( target_name=target, namespace=namespace, display=Padding( colorize_string( color=default_profile_status.color, value=f"\nDefault Dataflow Profile '{DEFAULT_DATAFLOW_PROFILE}' not found in namespace '{namespace}'", ), (0, 0, 0, PADDING), ), ) return check_manager.as_dict(as_list=as_list)