azext_edge/edge/providers/check/dataflow.py (1,356 lines of code) (raw):
# coding=utf-8
# ----------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License file in the project root for license information.
# ----------------------------------------------------------------------------------------------
from typing import List
from knack.log import get_logger
from rich.padding import Padding
from ...common import DEFAULT_DATAFLOW_PROFILE, CheckTaskStatus, ResourceState
from ..base import get_namespaced_pods_by_prefix
from ..edge_api.dataflow import DATAFLOW_API_V1, DataflowResourceKinds
from ..support.dataflow import DATAFLOW_NAME_LABEL, DATAFLOW_OPERATOR_PREFIX, DATAFLOW_PROFILE_POD_PREFIX
from .base import (
CheckManager,
check_post_deployment,
get_resources_by_name,
get_resources_grouped_by_namespace,
)
from .base.display import basic_property_display, colorize_string
from .base.pod import evaluate_pod_health
from .base.resource import filter_resources_by_name
from .common import (
DEFAULT_PADDING,
DEFAULT_PROPERTY_DISPLAY_COLOR,
PADDING_SIZE,
CoreServiceResourceKinds,
DataFlowEndpointAuthenticationType,
DataflowEndpointType,
DataflowOperationType,
ResourceOutputDetailLevel,
)
logger = get_logger(__name__)
PADDING = DEFAULT_PADDING
INNER_PADDING = PADDING + PADDING_SIZE
dataflow_api_check_name = "enumerateDataflowApi"
dataflow_api_check_desc = "Enumerate Dataflow API resources"
dataflow_runtime_check_name = "evalCoreServiceRuntime"
dataflow_runtime_check_desc = "Evaluate Dataflow core service"
dataflows_check_name = "evalDataflows"
dataflows_check_desc = "Evaluate Dataflows"
dataflow_endpoint_check_name = "evalDataflowEndpoints"
dataflow_endpoint_check_desc = "Evaluate Dataflow Endpoints"
dataflow_profile_check_name = "evalDataflowProfiles"
dataflow_profile_check_desc = "Evaluate Dataflow Profiles"
dataflow_target = "dataflows.connectivity.iotoperations.azure.com"
dataflow_endpoint_target = "dataflowendpoints.connectivity.iotoperations.azure.com"
dataflow_profile_target = "dataflowprofiles.connectivity.iotoperations.azure.com"
valid_source_endpoint_types = [DataflowEndpointType.kafka.value, DataflowEndpointType.mqtt.value]
def _process_dataflow_resource_status(
check_manager: CheckManager,
target_name: str,
namespace: str,
status: dict,
resource_name: str,
resource_kind: str,
detail_level: int,
padding: int,
):
# status is overall optional, no evals if no status
if not status:
return
inner_padding = padding + PADDING_SIZE
provisioning_status_enum = CheckTaskStatus.skipped
runtime_status_enum = CheckTaskStatus.skipped
# provisioning Status (always optional)
provisioning_status = status.get("provisioningStatus")
if provisioning_status:
# status - summary
provisioning_status_status = provisioning_status.get("status")
# override enum value with errors if they exist
provisioning_status_error = provisioning_status.get("error")
provisioning_status_failure_cause = provisioning_status.get("failureCause")
provisioning_status_enum = (
CheckTaskStatus.error
if provisioning_status_error or provisioning_status_failure_cause
else ResourceState.map_to_status(provisioning_status_status)
)
provisioning_status_display = f"Provisioning Status: {{{colorize_string(color=provisioning_status_enum.color, value=provisioning_status_status)}}}"
# if not summary, display output / message
if detail_level > ResourceOutputDetailLevel.summary.value:
provisioning_status_output = provisioning_status.get("output", {})
# output object has no schema, but expecting "message" property to have details
provisioning_status_output_message = provisioning_status_output.get("message")
# if no "message" property, just use the entire object as output
provisioning_status_output_display = provisioning_status_output_message or provisioning_status_output
provisioning_status_display += f" - {colorize_string(provisioning_status_output_display)}"
# status display
check_manager.add_display(
target_name=target_name,
namespace=namespace,
display=Padding(provisioning_status_display, (0, 0, 0, padding)),
)
# display error, failure cause
if provisioning_status_error:
message = provisioning_status_error.get("message")
code = provisioning_status_error.get("code")
error_text = f"{code}: {message}"
error_display = f"Error: {{{colorize_string(color=CheckTaskStatus.error.color, value=error_text)}}}"
check_manager.add_display(
target_name=target_name,
namespace=namespace,
display=Padding(error_display, (0, 0, 0, padding))
)
# show failure cause
if provisioning_status_failure_cause:
check_manager.add_display(
target_name=target_name,
namespace=namespace,
display=basic_property_display(
label="Failure Cause", value=provisioning_status_failure_cause, padding=inner_padding
),
)
# logErrors on detail level 1
provisioning_status_log_errors = provisioning_status.get("logErrors")
if detail_level > ResourceOutputDetailLevel.summary.value and provisioning_status_log_errors:
check_manager.add_display(
target_name=target_name,
namespace=namespace,
display=Padding("Log Errors:", (0, 0, 0, inner_padding)),
)
# runtime status (required for profiles)
runtime_status = status.get("runtimeStatus", {})
if runtime_status:
# calculate status
runtime_level = runtime_status.get("level")
runtime_status_enum = ResourceState.map_to_status(runtime_level)
description = runtime_status.get("description")
# create display
runtime_status_display = (
f"Runtime Status: {{{colorize_string(color=runtime_status_enum.color, value=runtime_status.get('level'))}}}"
)
if description and detail_level > ResourceOutputDetailLevel.summary.value:
runtime_status_display += f" - {colorize_string(description)}"
check_manager.add_display(
target_name=target_name,
namespace=namespace,
display=Padding(runtime_status_display, (0, 0, 0, padding)),
)
elif resource_kind == DataflowResourceKinds.DATAFLOWPROFILE.value:
runtime_status_enum = CheckTaskStatus.error
# add evals if necessary
if provisioning_status_enum != CheckTaskStatus.skipped:
# todo - add error object to eval value?
status_obj = {"status.provisioningStatus.status": provisioning_status_status}
if provisioning_status_error:
status_obj["status.provisioningStatus.error"] = provisioning_status_error
check_manager.add_target_eval(
target_name=target_name,
namespace=namespace,
status=provisioning_status_enum.value,
resource_name=resource_name,
resource_kind=resource_kind,
value=status_obj,
)
if runtime_status_enum != CheckTaskStatus.skipped:
check_manager.add_target_eval(
target_name=target_name,
namespace=namespace,
status=runtime_status_enum.value,
resource_name=resource_name,
resource_kind=resource_kind,
value={"status.runtimeStatus.level": runtime_status.get("level")},
)
def _process_dataflow_sourcesettings(
check_manager: CheckManager,
target: str,
namespace: str,
dataflow_name: str,
endpoints: List[dict],
operation: dict,
detail_level: int,
padding: int,
):
inner_padding = padding + PADDING_SIZE
settings = operation.get("sourceSettings", {})
# show endpoint ref
# TODO - lots of shared code for validating source/dest endpoints, consider refactoring
endpoint_ref = settings.get("endpointRef")
# currently we are only looking for endpoint references in the same namespace
# duplicate names should not exist, so check the first endpoint that matches the name ref
endpoint_ref_string = "not found"
endpoint_ref_status = endpoint_type_status = CheckTaskStatus.error
endpoint_type_status_string = "invalid"
found_endpoint = next(
(endpoint for endpoint in endpoints if "name" in endpoint and endpoint["name"] == endpoint_ref), None
)
endpoint_type = found_endpoint["type"] if found_endpoint and "type" in found_endpoint else None
if found_endpoint:
endpoint_ref_status = CheckTaskStatus.success
endpoint_ref_string = "detected"
endpoint_type_valid = endpoint_type and endpoint_type.lower() in valid_source_endpoint_types
endpoint_type_status = CheckTaskStatus.success if endpoint_type_valid else CheckTaskStatus.error
endpoint_type_status_string = "valid" if endpoint_type_valid else f"has invalid type: {endpoint_type}"
endpoint_ref_display = colorize_string(value=endpoint_ref_string, color=endpoint_ref_status.color)
endpoint_validity_display = colorize_string(
color=endpoint_type_status.color, value=endpoint_type_status_string
)
# valid endpoint ref eval
check_manager.add_target_eval(
target_name=target,
namespace=namespace,
status=endpoint_ref_status.value,
resource_name=dataflow_name,
resource_kind=DataflowResourceKinds.DATAFLOW.value,
value={"spec.operations[*].sourceSettings.endpointRef": endpoint_ref},
)
# valid source endpoint type eval
check_manager.add_target_eval(
target_name=target,
namespace=namespace,
status=endpoint_type_status.value,
resource_name=dataflow_name,
resource_kind=DataflowResourceKinds.DATAFLOW.value,
value={"ref(spec.operations[*].sourceSettings.endpointRef).endpointType": endpoint_type},
)
if detail_level > ResourceOutputDetailLevel.summary.value:
check_manager.add_display(
target_name=target, namespace=namespace, display=Padding("\nSource:", (0, 0, 0, padding))
)
endpoint_name_display = f"{{{colorize_string(value=endpoint_ref)}}}"
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding(
f"Dataflow Endpoint {endpoint_name_display} {endpoint_ref_display}, {endpoint_validity_display}",
(0, 0, 0, padding + PADDING_SIZE),
),
)
elif not found_endpoint or not endpoint_type_valid:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding(
"[red]Invalid source endpoint reference[/red]", (0, 0, 0, padding - PADDING_SIZE)
),
)
if detail_level > ResourceOutputDetailLevel.detail.value:
for label, key in [
# TODO - validate asset ref / colorize
("DeviceRegistry Asset Reference", "assetRef"),
("Schema Reference", "schemaRef"),
("Serialization Format", "serializationFormat"),
]:
val = settings.get(key)
if val:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=basic_property_display(label=label, value=val, padding=inner_padding),
)
# data source strings - not on summary
if detail_level > ResourceOutputDetailLevel.summary.value:
data_sources = settings.get("dataSources", [])
if data_sources:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding("Data Sources:", (0, 0, 0, inner_padding)),
)
for data_source in data_sources:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding(f"- {colorize_string(data_source)}", (0, 0, 0, inner_padding + 2)),
)
def _process_dataflow_transformationsettings(
check_manager: CheckManager, target: str, namespace: str, resource: dict, detail_level: int, padding: int
):
settings = resource.get("builtInTransformationSettings", {})
# only show details on non-summary
if detail_level > ResourceOutputDetailLevel.summary.value:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding("\nBuilt-In Transformation:", (0, 0, 0, padding)),
)
padding += PADDING_SIZE
inner_padding = padding + PADDING_SIZE
def _process_inputs(inputs: List[str]):
if inputs:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding("Inputs:", (0, 0, 0, inner_padding)),
)
for input in inputs:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding(f"- {colorize_string(input)}", (0, 0, 0, inner_padding + 2)),
)
# extra properties
for datasets_label, key in [
("Schema Reference", "schemaRef"),
("Serialization Format", "serializationFormat"),
]:
val = settings.get(key)
if val:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=basic_property_display(label=datasets_label, value=val, padding=padding),
)
# only show datasets, filters, maps on verbose
if detail_level > ResourceOutputDetailLevel.detail.value:
# datasets
datasets = settings.get("datasets", [])
if datasets:
check_manager.add_display(
target_name=target, namespace=namespace, display=Padding("Datasets:", (0, 0, 0, padding))
)
for dataset in datasets:
for label, key in [
("Description", "description"),
("Key", "key"),
("Expression", "expression"),
("Schema", "schemaRef"),
]:
val = dataset.get(key)
if val:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=basic_property_display(label=label, value=val, padding=inner_padding),
)
inputs = dataset.get("inputs", [])
_process_inputs(inputs)
# filters
filters = settings.get("filter", [])
if filters:
check_manager.add_display(
target_name=target, namespace=namespace, display=Padding("Filters:", (0, 0, 0, padding))
)
for filter in filters:
for datasets_label, key in [
("Description", "description"),
("Expression", "expression"),
("Operation Type", "type"),
]:
val = filter.get(key)
if val:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=basic_property_display(label=datasets_label, value=val, padding=padding),
)
inputs = filter.get("inputs", [])
_process_inputs(inputs)
# maps
maps = settings.get("map", [])
if maps:
check_manager.add_display(
target_name=target, namespace=namespace, display=Padding("Maps:", (0, 0, 0, padding))
)
for map in maps:
for label, key in [
("Description", "description"),
("Expression", "expression"),
("Output", "output"),
("Transformation Type", "type"),
]:
val = map.get(key)
if val:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=basic_property_display(label=label, value=val, padding=inner_padding),
)
inputs = map.get("inputs", [])
_process_inputs(inputs)
def _process_dataflow_destinationsettings(
check_manager: CheckManager,
target: str,
namespace: str,
dataflow_name: str,
endpoints: List[dict],
operation: dict,
detail_level: int,
padding: int,
):
settings = operation.get("destinationSettings", {})
if detail_level > ResourceOutputDetailLevel.summary.value:
check_manager.add_display(
target_name=target, namespace=namespace, display=Padding("\nDestination:", (0, 0, 0, padding))
)
endpoint_ref = settings.get("endpointRef")
# currently we are only looking for endpoint references in the same namespace
# duplicate names should not exist, so check the first endpoint that matches the name ref
endpoint_match = next(
(endpoint for endpoint in endpoints if "name" in endpoint and endpoint["name"] == endpoint_ref), None
)
endpoint_validity = "valid"
endpoint_status = CheckTaskStatus.success
if not endpoint_match:
endpoint_validity = "not found"
endpoint_status = CheckTaskStatus.error
# valid endpoint ref eval
check_manager.add_target_eval(
target_name=target,
namespace=namespace,
status=endpoint_status.value,
resource_name=dataflow_name,
resource_kind=DataflowResourceKinds.DATAFLOW.value,
value={"spec.operations[*].destinationSettings.endpointRef": endpoint_ref},
)
# show dataflow endpoint ref on detail
if detail_level > ResourceOutputDetailLevel.summary.value:
padding += PADDING_SIZE
endpoint_name_display = f"{{{colorize_string(value=endpoint_ref)}}}"
endpoint_validity_display = colorize_string(color=endpoint_status.color, value=endpoint_validity)
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding(
f"Dataflow Endpoint {endpoint_name_display} {endpoint_validity_display}",
(0, 0, 0, padding),
),
)
elif not endpoint_match:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding(
"[red]Invalid destination endpoint reference[/red]", (0, 0, 0, padding - PADDING_SIZE)
),
)
# only show destination on verbose
if detail_level > ResourceOutputDetailLevel.detail.value:
for label, key in [
("Data Destination", "dataDestination"),
]:
val = settings.get(key)
if val:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=basic_property_display(label=label, value=val, padding=padding),
)
def _process_endpoint_authentication(
endpoint_settings: dict, check_manager: CheckManager, target: str, namespace: str, padding: int, detail_level: int
) -> None:
auth_property_dict = {
DataFlowEndpointAuthenticationType.access_token.value: {
"key": "accessTokenSettings",
"displays": [
("Secret Reference", "secretRef"),
],
},
DataFlowEndpointAuthenticationType.system_assigned.value: {
"key": "systemAssignedManagedIdentitySettings",
"displays": [
("Audience", "audience"),
],
},
DataFlowEndpointAuthenticationType.user_assigned.value: {
"key": "userAssignedManagedIdentitySettings",
"displays": [
("Client ID", "clientId"),
("Scope", "scope"),
("Tenant ID", "tenantId"),
],
},
DataFlowEndpointAuthenticationType.x509.value: {
"key": "x509CertificateSettings",
"displays": [
("Secret Ref", "secretRef"),
],
},
DataFlowEndpointAuthenticationType.service_account_token.value: {
"key": "serviceAccountTokenSettings",
"displays": [
("Audience", "audience"),
],
},
DataFlowEndpointAuthenticationType.sasl.value: {
"key": "saslSettings",
"displays": [
("Type", "saslType"),
("Secret Ref", "secretRef"),
],
},
DataFlowEndpointAuthenticationType.anonymous.value: {
"key": "anonymousSettings",
"displays": [],
},
}
auth: dict = endpoint_settings.get("authentication", {})
auth_method = auth.get("method")
# display unkown auth method
if auth_method not in auth_property_dict:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding(f"[red]Unknown authentication method: {auth_method}", (0, 0, 0, padding)),
)
return
# display auth method
check_manager.add_display(
target_name=target,
namespace=namespace,
display=basic_property_display(label="Authentication Method", value=auth_method, padding=padding),
)
# show details for various auth methods
if detail_level > ResourceOutputDetailLevel.detail.value:
auth_properties: dict = auth_property_dict.get(auth_method, {})
auth_settings_key = auth_properties.get("key")
auth_obj = auth.get(auth_settings_key)
if auth_obj:
for label, key in auth_properties.get("displays", []):
val = auth_obj.get(key)
if val:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=basic_property_display(label=label, value=val, padding=padding + PADDING_SIZE),
)
def _process_endpoint_TLS(
tls_settings: dict, check_manager: CheckManager, target: str, namespace: str, padding: int
) -> None:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding("TLS:", (0, 0, 0, padding)),
)
for label, key in [
("Mode", "mode"),
("Trusted CA ConfigMap", "trustedCaCertificateConfigMapRef"),
]:
# TODO - validate ref?
val = tls_settings.get(key)
if val:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=basic_property_display(label=label, value=val, padding=(padding + PADDING_SIZE)),
)
def _process_endpoint_mqttsettings(
check_manager: CheckManager, target: str, namespace: str, spec: dict, detail_level: int, padding: int
) -> None:
settings = spec.get("mqttSettings", {})
for label, key in [
("MQTT Host", "host"),
("Protocol", "protocol"),
]:
val = settings.get(key)
if val:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=basic_property_display(label=label, value=val, padding=padding),
)
# endpoint authentication details
_process_endpoint_authentication(
endpoint_settings=settings,
check_manager=check_manager,
target=target,
namespace=namespace,
padding=INNER_PADDING,
detail_level=detail_level,
)
if detail_level > ResourceOutputDetailLevel.detail.value:
for label, key in [
("Cloud Event Attributes", "cloudEventAttributes"),
("Client ID Prefix", "clientIdPrefix"),
("Keep Alive (s)", "keepAliveSeconds"),
("Max Inflight Messages", "maxInflightMessages"),
("QOS", "qos"),
("Retain", "retain"),
("Session Expiry (s)", "sessionExpirySeconds"),
]:
val = settings.get(key)
if val:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=basic_property_display(label=label, value=val, padding=padding),
)
# TLS
tls = settings.get("tls", {})
if tls:
_process_endpoint_TLS(
tls_settings=tls,
check_manager=check_manager,
target=target,
namespace=namespace,
padding=padding,
)
def _process_endpoint_kafkasettings(
check_manager: CheckManager, target: str, namespace: str, spec: dict, detail_level: int, padding: int
) -> None:
inner_padding = padding + PADDING_SIZE
settings = spec.get("kafkaSettings", {})
for label, key in [
("Kafka Host", "host"),
("Consumer Group ID", "consumerGroupId"),
]:
val = settings.get(key)
if val:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=basic_property_display(label=label, value=val, padding=padding),
)
# endpoint authentication details
_process_endpoint_authentication(
endpoint_settings=settings,
check_manager=check_manager,
target=target,
namespace=namespace,
padding=INNER_PADDING,
detail_level=detail_level,
)
if detail_level > ResourceOutputDetailLevel.detail.value:
# extra properties
for label, key in [
("Cloud Event Attributes", "cloudEventAttributes"),
("Compression", "compression"),
("Copy MQTT Properties", "copyMqttProperties"),
("Acks", "kafkaAcks"),
("Partition Strategy", "partitionStrategy"),
]:
val = settings.get(key)
if val:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=basic_property_display(label=label, value=val, padding=padding),
)
# TLS
tls = settings.get("tls", {})
if tls:
_process_endpoint_TLS(
tls_settings=tls,
check_manager=check_manager,
target=target,
namespace=namespace,
padding=padding,
)
# batching
batching = settings.get("batching", {})
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding("Batching:", (0, 0, 0, padding)),
)
for label, key in [
("Latency (ms)", "latencyMs"),
("Max Bytes", "maxBytes"),
("Max Messages", "maxMessages"),
("Mode", "mode"),
]:
val = batching.get(key)
if val:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=basic_property_display(label=label, value=val, padding=inner_padding),
)
def _process_endpoint_fabriconelakesettings(
check_manager: CheckManager, target: str, namespace: str, spec: dict, detail_level: int, padding: int
) -> None:
settings = spec.get("fabricOneLakeSettings", {})
for label, key in [("Fabric Host", "host"), ("Path Type", "oneLakePathType")]:
val = settings.get(key)
if val:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=basic_property_display(label=label, value=val, padding=padding),
)
# endpoint authentication details
_process_endpoint_authentication(
endpoint_settings=settings,
check_manager=check_manager,
target=target,
namespace=namespace,
padding=INNER_PADDING,
detail_level=detail_level,
)
if detail_level > ResourceOutputDetailLevel.detail.value:
names = settings.get("names", {})
for label, key in [
("Lakehouse Name", "lakehouseName"),
("Workspace Name", "workspaceName"),
]:
val = names.get(key)
if val:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=basic_property_display(label=label, value=val, padding=padding),
)
batching = settings.get("batching", {})
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding("Batching:", (0, 0, 0, padding)),
)
padding += PADDING_SIZE
for label, key in [
("Latency (s)", "latencySeconds"),
("Max Messages", "maxMessages"),
]:
val = batching.get(key)
if val:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=basic_property_display(label=label, value=val, padding=padding),
)
def _process_endpoint_datalakestoragesettings(
check_manager: CheckManager, target: str, namespace: str, spec: dict, detail_level: int, padding: int
) -> None:
settings = spec.get("datalakeStorageSettings", {})
for label, key in [("DataLake Host", "host")]:
val = settings.get(key)
if val:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=basic_property_display(label=label, value=val, padding=padding),
)
# endpoint authentication details
_process_endpoint_authentication(
endpoint_settings=settings,
check_manager=check_manager,
target=target,
namespace=namespace,
padding=INNER_PADDING,
detail_level=detail_level,
)
if detail_level > ResourceOutputDetailLevel.detail.value:
batching = settings.get("batching", {})
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding("Batching:", (0, 0, 0, padding)),
)
padding += PADDING_SIZE
for label, key in [
("Latency (s)", "latencySeconds"),
("Max Messages", "maxMessages"),
]:
val = batching.get(key)
if val:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=basic_property_display(label=label, value=val, padding=padding),
)
def _process_endpoint_dataexplorersettings(
check_manager: CheckManager, target: str, namespace: str, spec: dict, detail_level: int, padding: int
) -> None:
settings = spec.get("dataExplorerSettings", {})
for label, key in [("Database Name", "database"), ("Data Explorer Host", "host")]:
val = settings.get(key)
if val:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=basic_property_display(label=label, value=val, padding=padding),
)
# endpoint authentication details
_process_endpoint_authentication(
endpoint_settings=settings,
check_manager=check_manager,
target=target,
namespace=namespace,
padding=INNER_PADDING,
detail_level=detail_level,
)
if detail_level > ResourceOutputDetailLevel.detail.value:
batching = settings.get("batching", {})
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding("Batching:", (0, 0, 0, padding)),
)
padding += PADDING_SIZE
for label, key in [
("Latency (s)", "latencySeconds"),
("Max Messages", "maxMessages"),
]:
val = batching.get(key)
if val:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=basic_property_display(label=label, value=val, padding=padding),
)
def _process_endpoint_localstoragesettings(
check_manager: CheckManager, target: str, namespace: str, spec: dict, detail_level: int, padding: int
) -> None:
# TODO - validate reference
settings = spec.get("localStorageSettings", {})
persistent_volume_claim = settings.get("persistentVolumeClaimRef")
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding(f"Persistent Volume Claim: {persistent_volume_claim}", (0, 0, 0, padding)),
)
# endpoint authentication details
_process_endpoint_authentication(
endpoint_settings=settings,
check_manager=check_manager,
target=target,
namespace=namespace,
padding=INNER_PADDING,
detail_level=detail_level,
)
def check_dataflows_deployment(
as_list: bool = False,
detail_level: int = ResourceOutputDetailLevel.summary.value,
resource_kinds: List[str] = None,
resource_name: str = None,
) -> List[dict]:
evaluate_funcs = {
CoreServiceResourceKinds.RUNTIME_RESOURCE: evaluate_core_service_runtime,
DataflowResourceKinds.DATAFLOWPROFILE: evaluate_dataflow_profiles,
DataflowResourceKinds.DATAFLOWENDPOINT: evaluate_dataflow_endpoints,
DataflowResourceKinds.DATAFLOW: evaluate_dataflows,
}
return check_post_deployment(
api_info=DATAFLOW_API_V1,
check_name=dataflow_api_check_name,
check_desc=dataflow_api_check_desc,
evaluate_funcs=evaluate_funcs,
as_list=as_list,
detail_level=detail_level,
resource_kinds=resource_kinds,
resource_name=resource_name,
)
def evaluate_core_service_runtime(
as_list: bool = False,
detail_level: int = ResourceOutputDetailLevel.summary.value,
resource_name: str = None,
):
check_manager = CheckManager(
check_name=dataflow_runtime_check_name,
check_desc=dataflow_runtime_check_desc,
)
operators = get_namespaced_pods_by_prefix(
prefix=DATAFLOW_OPERATOR_PREFIX,
namespace="",
label_selector=DATAFLOW_NAME_LABEL,
)
if resource_name:
operators = filter_resources_by_name(
resources=operators,
resource_name=resource_name,
)
if not operators:
check_manager.add_target(target_name=CoreServiceResourceKinds.RUNTIME_RESOURCE.value)
check_manager.add_display(
target_name=CoreServiceResourceKinds.RUNTIME_RESOURCE.value,
display=Padding("Unable to fetch pods.", (0, 0, 0, PADDING)),
)
for namespace, pods in get_resources_grouped_by_namespace(operators):
check_manager.add_target(
target_name=CoreServiceResourceKinds.RUNTIME_RESOURCE.value,
namespace=namespace,
)
check_manager.add_display(
target_name=CoreServiceResourceKinds.RUNTIME_RESOURCE.value,
namespace=namespace,
display=Padding(
f"Dataflow operator in namespace {{[purple]{namespace}[/purple]}}",
(0, 0, 0, PADDING),
),
)
evaluate_pod_health(
check_manager=check_manager,
target=CoreServiceResourceKinds.RUNTIME_RESOURCE.value,
pods=pods,
namespace=namespace,
padding=PADDING + 2,
detail_level=detail_level,
)
return check_manager.as_dict(as_list)
def evaluate_dataflows(
as_list: bool = False,
detail_level: int = ResourceOutputDetailLevel.summary.value,
resource_name: str = None,
):
check_manager = CheckManager(
check_name=dataflows_check_name,
check_desc=dataflows_check_desc,
)
all_dataflows = get_resources_by_name(
api_info=DATAFLOW_API_V1,
kind=DataflowResourceKinds.DATAFLOW,
resource_name=resource_name,
)
target = dataflow_target
# No dataflows - skip
if not all_dataflows:
no_dataflows_text = "No Dataflow resources detected in any namespace."
check_manager.add_target(target_name=target)
check_manager.add_target_eval(
target_name=target, status=CheckTaskStatus.skipped.value, value={"dataflows": no_dataflows_text}
)
check_manager.add_display(
target_name=target,
display=Padding(no_dataflows_text, (0, 0, 0, PADDING)),
)
return check_manager.as_dict(as_list=as_list)
for namespace, dataflows in get_resources_grouped_by_namespace(all_dataflows):
check_manager.add_target(target_name=target, namespace=namespace)
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding(f"Dataflows in namespace {{[purple]{namespace}[/purple]}}", (0, 0, 0, PADDING)),
)
# conditions
check_manager.add_target_conditions(
target_name=target,
namespace=namespace,
conditions=[
# at least a source and destination operation
"len(spec.operations)<=3",
# valid source endpoint
"spec.operations[*].sourceSettings.endpointRef",
"ref(spec.operations[*].sourceSettings.endpointRef).endpointType in ('kafka','mqtt')",
# valid destination endpoint
"spec.operations[*].destinationSettings.endpointRef",
# single source/destination
"len(spec.operations[*].sourceSettings)==1",
"len(spec.operations[*].destinationSettings)==1",
],
)
# profile names for reference lookup
all_profiles = get_resources_by_name(
api_info=DATAFLOW_API_V1,
kind=DataflowResourceKinds.DATAFLOWPROFILE,
namespace=namespace,
resource_name=None,
)
profile_names = {profile.get("metadata", {}).get("name") for profile in all_profiles}
all_endpoints = get_resources_by_name(
api_info=DATAFLOW_API_V1,
kind=DataflowResourceKinds.DATAFLOWENDPOINT,
namespace=namespace,
resource_name=None,
)
endpoints = [
{
"name": endpoint.get("metadata", {}).get("name"),
"type": endpoint.get("spec", {}).get("endpointType"),
}
for endpoint in all_endpoints
]
for dataflow in list(dataflows):
spec = dataflow.get("spec", {})
dataflow_name = dataflow.get("metadata", {}).get("name")
mode = spec.get("mode")
mode_lower = str(mode).lower() if mode else "unknown"
dataflow_enabled = mode_lower == "enabled"
mode_display = colorize_string(value=mode_lower)
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding(
f"\n- Dataflow {{{colorize_string(value=dataflow_name)}}} is {mode_display}",
(0, 0, 0, PADDING),
),
)
# if dataflow is disabled, skip evaluations and displays
if not dataflow_enabled:
check_manager.add_target_eval(
target_name=target,
namespace=namespace,
status=CheckTaskStatus.skipped.value,
resource_name=dataflow_name,
resource_kind=DataflowResourceKinds.DATAFLOW.value,
value={"spec.mode": mode},
)
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding(
colorize_string(
value=f"{CheckTaskStatus.skipped.emoji} Skipping evaluation of disabled dataflow",
color=CheckTaskStatus.skipped.color,
),
(0, 0, 0, PADDING + 2),
),
)
continue
status = dataflow.get("status", {})
_process_dataflow_resource_status(
check_manager=check_manager,
target_name=target,
namespace=namespace,
status=status,
resource_name=dataflow_name,
resource_kind=DataflowResourceKinds.DATAFLOW.value,
detail_level=detail_level,
padding=INNER_PADDING,
)
profile_ref = spec.get("profileRef")
# profileRef is optional, only show an error if it exists but is invalid
if profile_ref:
profile_ref_status = (
CheckTaskStatus.error if profile_ref not in profile_names else CheckTaskStatus.success
)
# valid profileRef eval
check_manager.add_target_eval(
target_name=target,
namespace=namespace,
status=profile_ref_status.value,
resource_name=dataflow_name,
resource_kind=DataflowResourceKinds.DATAFLOW.value,
value={"spec.profileRef": profile_ref},
)
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding(
f"Dataflow Profile: {{{colorize_string(color=profile_ref_status.color, value=profile_ref)}}}",
(0, 0, 0, INNER_PADDING),
),
)
if profile_ref_status == CheckTaskStatus.error:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding(
colorize_string(
color=profile_ref_status.color, value="Invalid Dataflow Profile reference"
),
(0, 0, 0, INNER_PADDING),
),
)
operations = spec.get("operations", [])
# check operations count
operations_status = CheckTaskStatus.success.value
if not operations or not (2 <= len(operations) <= 3):
operations_status = CheckTaskStatus.error.value
check_manager.add_target_eval(
target_name=target,
namespace=namespace,
status=operations_status,
resource_name=dataflow_name,
resource_kind=DataflowResourceKinds.DATAFLOW.value,
value={"len(operations)": len(operations)},
)
if operations and detail_level > ResourceOutputDetailLevel.summary.value:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding("Operations:", (0, 0, 0, INNER_PADDING)),
)
operation_padding = INNER_PADDING + PADDING_SIZE
sources = destinations = 0
for operation in operations:
op_type = operation.get("operationType", "").lower()
if op_type == DataflowOperationType.source.value:
sources += 1
_process_dataflow_sourcesettings(
check_manager=check_manager,
target=target,
namespace=namespace,
dataflow_name=dataflow_name,
endpoints=endpoints,
operation=operation,
detail_level=detail_level,
padding=operation_padding,
)
elif op_type == DataflowOperationType.builtin_transformation.value:
_process_dataflow_transformationsettings(
check_manager=check_manager,
target=target,
namespace=namespace,
resource=operation,
detail_level=detail_level,
padding=operation_padding,
)
elif op_type == DataflowOperationType.destination.value:
destinations += 1
_process_dataflow_destinationsettings(
check_manager=check_manager,
target=target,
namespace=namespace,
dataflow_name=dataflow_name,
endpoints=endpoints,
operation=operation,
detail_level=detail_level,
padding=operation_padding,
)
# eval source amount (1)
sources_status = destinations_status = CheckTaskStatus.success.value
if sources != 1:
sources_status = CheckTaskStatus.error.value
message = (
"Missing source operation" if sources == 0 else f"Too many source operations: {sources}"
)
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding(f"[red]{message}[/red]", (0, 0, 0, INNER_PADDING)),
)
check_manager.add_target_eval(
target_name=target,
namespace=namespace,
status=sources_status,
resource_name=dataflow_name,
resource_kind=DataflowResourceKinds.DATAFLOW.value,
value={"len(spec.operations[*].sourceSettings)": sources},
)
if destinations != 1:
destinations_status = CheckTaskStatus.error.value
message = (
"Missing destination operation"
if destinations == 0
else f"Too many destination operations: {destinations}"
)
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding(f"[red]{message}[/red]", (0, 0, 0, INNER_PADDING)),
)
check_manager.add_target_eval(
target_name=target,
namespace=namespace,
status=destinations_status,
resource_name=dataflow_name,
resource_kind=DataflowResourceKinds.DATAFLOW.value,
value={"len(spec.operations[*].destinationSettings)": destinations},
)
return check_manager.as_dict(as_list=as_list)
def evaluate_dataflow_endpoints(
as_list: bool = False,
detail_level: int = ResourceOutputDetailLevel.summary.value,
resource_name: str = None,
):
check_manager = CheckManager(
check_name=dataflow_endpoint_check_name,
check_desc=dataflow_endpoint_check_desc,
)
all_endpoints = get_resources_by_name(
api_info=DATAFLOW_API_V1,
kind=DataflowResourceKinds.DATAFLOWENDPOINT,
resource_name=resource_name,
)
target = dataflow_endpoint_target
if not all_endpoints:
no_endpoints_text = "No Dataflow Endpoints detected in any namespace."
check_manager.add_target(target_name=target)
check_manager.add_target_eval(
target_name=target, status=CheckTaskStatus.skipped.value, value={"endpoints": no_endpoints_text}
)
check_manager.add_display(
target_name=target,
display=Padding(no_endpoints_text, (0, 0, 0, PADDING)),
)
return check_manager.as_dict(as_list=as_list)
for namespace, endpoints in get_resources_grouped_by_namespace(all_endpoints):
check_manager.add_target(target_name=target, namespace=namespace, conditions=["spec.endpointType"])
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding(
f"Dataflow Endpoints in namespace {{[purple]{namespace}[/purple]}}", (0, 0, 0, PADDING)
),
)
for endpoint in list(endpoints):
spec = endpoint.get("spec", {})
endpoint_name = endpoint.get("metadata", {}).get("name")
endpoint_type = spec.get("endpointType")
valid_endpoint_type = endpoint_type and endpoint_type.lower() in DataflowEndpointType.list()
check_manager.add_target_eval(
target_name=target,
namespace=namespace,
status=CheckTaskStatus.success.value if valid_endpoint_type else CheckTaskStatus.error.value,
resource_name=endpoint_name,
resource_kind=DataflowResourceKinds.DATAFLOWENDPOINT.value,
value={"spec.endpointType": endpoint_type},
)
endpoint_string = f"Endpoint {{{colorize_string(value=endpoint_name)}}}"
detected_string = colorize_string(color="green", value="detected")
type_string = f"type: {colorize_string(color=DEFAULT_PROPERTY_DISPLAY_COLOR if valid_endpoint_type else 'red', value=endpoint_type)}"
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding(
f"\n- {endpoint_string} {detected_string}, {type_string}",
(0, 0, 0, PADDING),
),
)
status = endpoint.get("status", {})
_process_dataflow_resource_status(
check_manager=check_manager,
target_name=target,
namespace=namespace,
status=status,
resource_name=endpoint_name,
resource_kind=DataflowResourceKinds.DATAFLOWENDPOINT.value,
detail_level=detail_level,
padding=INNER_PADDING,
)
# endpoint details at higher detail levels
if detail_level > ResourceOutputDetailLevel.summary.value:
endpoint_processor_dict = {
DataflowEndpointType.mqtt.value: _process_endpoint_mqttsettings,
DataflowEndpointType.kafka.value: _process_endpoint_kafkasettings,
DataflowEndpointType.fabric_onelake.value: _process_endpoint_fabriconelakesettings,
DataflowEndpointType.datalake.value: _process_endpoint_datalakestoragesettings,
DataflowEndpointType.data_explorer.value: _process_endpoint_dataexplorersettings,
DataflowEndpointType.local_storage.value: _process_endpoint_localstoragesettings,
}
# process endpoint settings
if endpoint_type and endpoint_type.lower() in endpoint_processor_dict:
endpoint_processor_dict[endpoint_type.lower()](
check_manager=check_manager,
target=target,
namespace=namespace,
spec=spec,
detail_level=detail_level,
padding=INNER_PADDING,
)
else:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding(
colorize_string(color="red", value=f"Unknown endpoint type: {endpoint_type}"),
(0, 0, 0, INNER_PADDING),
),
)
return check_manager.as_dict(as_list=as_list)
def evaluate_dataflow_profiles(
as_list: bool = False,
detail_level: int = ResourceOutputDetailLevel.summary.value,
resource_name: str = None,
):
check_manager = CheckManager(
check_name=dataflow_profile_check_name,
check_desc=dataflow_profile_check_desc,
)
target = dataflow_profile_target
all_profiles = get_resources_by_name(
api_info=DATAFLOW_API_V1,
kind=DataflowResourceKinds.DATAFLOWPROFILE,
resource_name=resource_name,
)
if not all_profiles:
no_profiles_text = "No Dataflow Profiles detected in any namespace."
check_manager.add_target(target_name=target)
# if we may have manually filtered out the default profile by input, skip instead of warn
default_profile_status = CheckTaskStatus.skipped if resource_name else CheckTaskStatus.warning
check_manager.add_target_eval(
target_name=target, status=default_profile_status.value, value={"profiles": no_profiles_text}
)
check_manager.add_display(
target_name=target,
display=Padding(no_profiles_text, (0, 0, 0, PADDING)),
)
return check_manager.as_dict(as_list=as_list)
for namespace, profiles in get_resources_grouped_by_namespace(all_profiles):
check_manager.add_target(
target_name=target,
namespace=namespace,
conditions=["spec.instanceCount", f"[*].metadata.name=='{DEFAULT_DATAFLOW_PROFILE}'"],
)
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding(
f"Dataflow Profiles in namespace {{[purple]{namespace}[/purple]}}", (0, 0, 0, PADDING)
),
)
# warn if no default dataflow profile (unless possibly filtered)
default_profile_status = CheckTaskStatus.skipped if resource_name else CheckTaskStatus.warning
for profile in list(profiles):
profile_name = profile.get("metadata", {}).get("name")
# check for default dataflow profile
if profile_name == DEFAULT_DATAFLOW_PROFILE:
default_profile_status = CheckTaskStatus.success
spec = profile.get("spec", {})
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding(
f"\n- Profile {{{colorize_string(value=profile_name)}}} {colorize_string(color='green', value='detected')}",
(0, 0, 0, PADDING),
),
)
# evaluate status
status = profile.get("status", {})
_process_dataflow_resource_status(
check_manager=check_manager,
target_name=target,
namespace=namespace,
status=status,
resource_name=profile_name,
resource_kind=DataflowResourceKinds.DATAFLOWPROFILE.value,
detail_level=detail_level,
padding=INNER_PADDING,
)
# instance count
instance_count = spec.get("instanceCount")
has_instances = instance_count is not None and int(instance_count) >= 0
instance_status = CheckTaskStatus.success if has_instances else CheckTaskStatus.error
check_manager.add_target_eval(
target_name=target,
namespace=namespace,
status=instance_status.value,
resource_name=profile_name,
resource_kind=DataflowResourceKinds.DATAFLOWPROFILE.value,
value={"spec.instanceCount": instance_count},
)
if has_instances:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=basic_property_display(
label="Instance count",
value=instance_count,
color=instance_status.color,
padding=INNER_PADDING,
),
)
else:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding("[red]No instance count set[/red]", (0, 0, 0, INNER_PADDING)),
)
# diagnostics on higher detail levels
if detail_level > ResourceOutputDetailLevel.summary.value:
log_padding = PADDING + PADDING_SIZE
log_inner_padding = log_padding + PADDING_SIZE
diagnostics = spec.get("diagnostics", {})
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding("Diagnostic Logs:", (0, 0, 0, log_padding)),
)
# diagnostic logs
diagnostic_logs = diagnostics.get("logs", {})
diagnostic_log_level = diagnostic_logs.get("level")
check_manager.add_display(
target_name=target,
namespace=namespace,
display=basic_property_display(
label="Log Level", value=diagnostic_log_level, padding=log_inner_padding
),
)
if detail_level > ResourceOutputDetailLevel.detail.value:
diagnostic_log_otelconfig = diagnostic_logs.get("openTelemetryExportConfig", {})
if diagnostic_log_otelconfig:
for label, key in [
("Endpoint", "otlpGrpcEndpoint"),
("Interval (s)", "intervalSeconds"),
("Level", "level"),
]:
val = diagnostic_log_otelconfig.get(key)
if val:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=basic_property_display(
label=label, value=val, padding=log_inner_padding
),
)
# diagnostic metrics
diagnostic_metrics = diagnostics.get("metrics", {})
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding("Diagnostic Metrics:", (0, 0, 0, log_padding)),
)
diagnostic_metrics_prometheusPort = diagnostic_metrics.get("prometheusPort")
check_manager.add_display(
target_name=target,
namespace=namespace,
display=basic_property_display(
label="Prometheus Port",
value=diagnostic_metrics_prometheusPort,
padding=log_inner_padding,
),
)
diagnostic_metrics_otelconfig = diagnostic_metrics.get("openTelemetryExportConfig", {})
if diagnostic_metrics_otelconfig:
for label, key in [
("Endpoint", "otlpGrpcEndpoint"),
("Interval (s)", "intervalSeconds"),
]:
val = diagnostic_metrics_otelconfig.get(key)
if val:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=basic_property_display(
label=label, value=val, padding=log_inner_padding
),
)
# pod health - trailing `-` is important in case profiles have similar prefixes
pod_prefix = f"{DATAFLOW_PROFILE_POD_PREFIX}{profile_name}-"
profile_pods = get_namespaced_pods_by_prefix(
prefix=pod_prefix,
namespace=namespace,
label_selector=DATAFLOW_NAME_LABEL,
)
# only show pods if they exist
if profile_pods:
evaluate_pod_health(
check_manager=check_manager,
target=target,
pods=profile_pods,
namespace=namespace,
padding=INNER_PADDING,
detail_level=detail_level,
)
# default dataflow profile status, display warning if not success
check_manager.add_target_eval(
target_name=target,
namespace=namespace,
status=default_profile_status.value,
resource_kind=DataflowResourceKinds.DATAFLOWPROFILE.value,
resource_name=DEFAULT_DATAFLOW_PROFILE,
value={f"[*].metadata.name=='{DEFAULT_DATAFLOW_PROFILE}'": default_profile_status.value},
)
if default_profile_status not in [CheckTaskStatus.success, CheckTaskStatus.skipped]:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding(
colorize_string(
color=default_profile_status.color,
value=f"\nDefault Dataflow Profile '{DEFAULT_DATAFLOW_PROFILE}' not found in namespace '{namespace}'",
),
(0, 0, 0, PADDING),
),
)
return check_manager.as_dict(as_list=as_list)