azext_edge/edge/providers/check/deviceregistry.py (565 lines of code) (raw):
# coding=utf-8
# ----------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License file in the project root for license information.
# ----------------------------------------------------------------------------------------------
from typing import Any, Dict, List
from .base import (
CheckManager,
add_display_and_eval,
check_post_deployment,
generate_target_resource_name,
get_resources_by_name,
process_list_resource,
process_resource_properties,
get_resources_grouped_by_namespace,
)
from rich.padding import Padding
from ...common import CheckTaskStatus
from .common import (
ASSET_DATAPOINT_PROPERTIES,
ASSET_EVENT_PROPERTIES,
ASSET_PROPERTIES,
MAX_ASSET_DATAPOINTS,
MAX_ASSET_EVENTS,
PADDING_SIZE,
ResourceOutputDetailLevel,
)
from ..edge_api import (
DEVICEREGISTRY_API_V1,
DeviceRegistryResourceKinds,
)
def check_deviceregistry_deployment(
as_list: bool = False,
detail_level: int = ResourceOutputDetailLevel.summary.value,
resource_kinds: List[str] = None,
resource_name: str = None,
) -> List[dict]:
evaluate_funcs = {
DeviceRegistryResourceKinds.ASSET: evaluate_assets,
DeviceRegistryResourceKinds.ASSETENDPOINTPROFILE: evaluate_asset_endpoint_profiles,
}
return check_post_deployment(
api_info=DEVICEREGISTRY_API_V1,
check_name="enumerateDeviceRegistryApi",
check_desc="Enumerate Device Registry API resources",
resource_name=resource_name,
evaluate_funcs=evaluate_funcs,
as_list=as_list,
detail_level=detail_level,
resource_kinds=resource_kinds,
)
def evaluate_assets(
as_list: bool = False,
detail_level: int = ResourceOutputDetailLevel.summary.value,
resource_name: str = None,
) -> Dict[str, Any]:
check_manager = CheckManager(check_name="evalAssets", check_desc="Evaluate Assets")
asset_namespace_conditions = ["spec.assetEndpointProfileRef"]
target_assets = generate_target_resource_name(api_info=DEVICEREGISTRY_API_V1, resource_kind=DeviceRegistryResourceKinds.ASSET.value)
all_assets = get_resources_by_name(
api_info=DEVICEREGISTRY_API_V1,
kind=DeviceRegistryResourceKinds.ASSET,
resource_name=resource_name
)
if not all_assets:
fetch_assets_warning_text = "Unable to fetch assets in any namespaces."
check_manager.add_target(target_name=target_assets)
check_manager.add_display(target_name=target_assets, display=Padding(fetch_assets_warning_text, (0, 0, 0, 8)))
check_manager.add_target_eval(
target_name=target_assets,
status=CheckTaskStatus.skipped.value,
value=fetch_assets_warning_text
)
return check_manager.as_dict(as_list)
for (namespace, assets) in get_resources_grouped_by_namespace(all_assets):
check_manager.add_target(target_name=target_assets, namespace=namespace, conditions=asset_namespace_conditions)
check_manager.add_display(
target_name=target_assets,
namespace=namespace,
display=Padding(
f"Device Registry assets in namespace {{[purple]{namespace}[/purple]}}",
(0, 0, 0, 8)
)
)
assets: List[dict] = list(assets)
added_datapoint_conditions = False
added_event_conditions = False
added_status_conditions = False
for asset in assets:
padding = 10
asset_name = asset["metadata"]["name"]
asset_status_text = (
f"- Asset {{[bright_blue]{asset_name}[/bright_blue]}} detected."
)
check_manager.add_display(
target_name=target_assets,
namespace=namespace,
display=Padding(asset_status_text, (0, 0, 0, padding)),
)
asset_spec = asset["spec"]
endpoint_profile_uri = asset_spec.get("assetEndpointProfileRef", "")
endpoint_profile = get_resources_by_name(
api_info=DEVICEREGISTRY_API_V1,
kind=DeviceRegistryResourceKinds.ASSETENDPOINTPROFILE,
resource_name=endpoint_profile_uri
)
spec_padding = padding + PADDING_SIZE
endpoint_profile_uri_value = {"spec.assetEndpointProfileRef": endpoint_profile_uri}
endpoint_profile_uri_status = CheckTaskStatus.success.value
if endpoint_profile:
endpoint_profile_uri_text = (
f"Asset endpoint profile {{[bright_blue]{endpoint_profile_uri}[/bright_blue]}} property [green]detected[/green]."
)
else:
endpoint_profile_uri_text = (
f"Asset endpoint profile {{[bright_blue]{endpoint_profile_uri}[/bright_blue]}} [red]not detected[/red]."
)
endpoint_profile_uri_status = CheckTaskStatus.error.value
add_display_and_eval(
check_manager=check_manager,
target_name=target_assets,
display_text=endpoint_profile_uri_text,
eval_status=endpoint_profile_uri_status,
eval_value=endpoint_profile_uri_value,
resource_name=asset_name,
namespace=namespace,
padding=(0, 0, 0, spec_padding)
)
# data points
# all should be under one dataset
dataset = asset_spec.get("datasets", [])
data_points = []
if dataset:
data_points = dataset[0].get("dataPoints", [])
if data_points:
if not added_datapoint_conditions:
check_manager.add_target_conditions(
target_name=target_assets,
namespace=namespace,
conditions=[
"len(spec.datasets[0].dataPoints)",
"spec.datasets[0].dataPoints.dataSource"
]
)
added_datapoint_conditions = True
data_points_count = len(data_points)
data_points_value = {"len(spec.datasets[0].dataPoints)": data_points_count}
data_points_status = CheckTaskStatus.success.value
if data_points_count > MAX_ASSET_DATAPOINTS:
data_points_text = (
f"Data points [red]exceeding {MAX_ASSET_DATAPOINTS}[/red]. Detected {data_points_count}."
)
else:
data_points_text = (
f"[bright_blue]{data_points_count}[/bright_blue] data points detected."
)
add_display_and_eval(
check_manager=check_manager,
target_name=target_assets,
display_text=data_points_text,
eval_status=data_points_status,
eval_value=data_points_value,
resource_name=asset_name,
namespace=namespace,
padding=(0, 0, 0, spec_padding)
)
for index, data_point in enumerate(data_points):
data_point_data_source = data_point.get("dataSource", "")
datapoint_padding = spec_padding + PADDING_SIZE
data_point_data_source_value = {f"spec.datasets[0].dataPoints.[{index}].dataSource": data_point_data_source}
data_point_data_source_status = CheckTaskStatus.success.value
if data_point_data_source:
data_point_data_source_text = (
f"- Data source: {{[bright_blue]{data_point_data_source}[/bright_blue]}} [green]detected[/green]."
)
else:
data_point_data_source_text = (
"Data source [red]not detected[/red]."
)
data_point_data_source_status = CheckTaskStatus.error.value
add_display_and_eval(
check_manager=check_manager,
target_name=target_assets,
display_text=data_point_data_source_text,
eval_status=data_point_data_source_status,
eval_value=data_point_data_source_value,
resource_name=asset_name,
namespace=namespace,
padding=(0, 0, 0, datapoint_padding)
)
if detail_level > ResourceOutputDetailLevel.summary.value:
process_resource_properties(
check_manager=check_manager,
detail_level=detail_level,
target_name=target_assets,
prop_value=data_point,
properties=ASSET_DATAPOINT_PROPERTIES,
namespace=namespace,
padding=(0, 0, 0, datapoint_padding + PADDING_SIZE)
)
if detail_level > ResourceOutputDetailLevel.summary.value:
process_resource_properties(
check_manager=check_manager,
detail_level=detail_level,
target_name=target_assets,
prop_value=asset_spec,
properties=ASSET_PROPERTIES,
namespace=namespace,
padding=(0, 0, 0, spec_padding)
)
# events
events = asset_spec.get("events", [])
if events:
if not added_event_conditions:
check_manager.add_target_conditions(
target_name=target_assets,
namespace=namespace,
conditions=[
"len(spec.events)",
"spec.events.eventNotifier"
]
)
added_event_conditions = True
events_count = len(events)
events_count_value = {"len(spec.events)": events_count}
events_count_status = CheckTaskStatus.success.value
if events_count > MAX_ASSET_EVENTS:
events_count_text = (
f"Events [red]exceeding {MAX_ASSET_EVENTS}[/red]. Detected {events_count}."
)
events_count_status = CheckTaskStatus.error.value
else:
events_count_text = (
f"[bright_blue]{events_count}[/bright_blue] events detected."
)
add_display_and_eval(
check_manager=check_manager,
target_name=target_assets,
display_text=events_count_text,
eval_status=events_count_status,
eval_value=events_count_value,
resource_name=asset_name,
namespace=namespace,
padding=(0, 0, 0, spec_padding)
)
for index, event in enumerate(events):
event_notifier = event.get("eventNotifier", "")
event_padding = spec_padding + PADDING_SIZE
event_notifier_value = {f"spec.events.[{index}].eventNotifier": event_notifier}
event_notifier_status = CheckTaskStatus.success.value
if event_notifier:
event_notifier_text = (
f"- Event notifier: {{[bright_blue]{event_notifier}[/bright_blue]}} [green]detected[/green]."
)
else:
event_notifier_text = (
"Event notifier [red]not detected[/red]."
)
event_notifier_status = CheckTaskStatus.error.value
add_display_and_eval(
check_manager=check_manager,
target_name=target_assets,
display_text=event_notifier_text,
eval_status=event_notifier_status,
eval_value=event_notifier_value,
resource_name=asset_name,
namespace=namespace,
padding=(0, 0, 0, event_padding)
)
if detail_level > ResourceOutputDetailLevel.summary.value:
process_resource_properties(
check_manager=check_manager,
detail_level=detail_level,
target_name=target_assets,
prop_value=event,
properties=ASSET_EVENT_PROPERTIES,
namespace=namespace,
padding=(0, 0, 0, event_padding + PADDING_SIZE)
)
# status
status = asset_spec.get("status", "")
if status:
_process_asset_status(
check_manager=check_manager,
added_status_conditions=added_status_conditions,
target_assets=target_assets,
asset_name=asset_name,
status=status,
padding=spec_padding,
namespace=namespace
)
return check_manager.as_dict(as_list)
def evaluate_asset_endpoint_profiles(
as_list: bool = False,
detail_level: int = ResourceOutputDetailLevel.summary.value,
resource_name: str = None,
) -> Dict[str, Any]:
check_manager = CheckManager(check_name="evalAssetEndpointProfiles", check_desc="Evaluate Asset Endpoint Profiles")
endpoint_namespace_conditions = ["spec.uuid"]
all_asset_endpoint_profiles = get_resources_by_name(
api_info=DEVICEREGISTRY_API_V1,
kind=DeviceRegistryResourceKinds.ASSETENDPOINTPROFILE,
resource_name=resource_name
)
target_asset_endpoint_profiles = generate_target_resource_name(api_info=DEVICEREGISTRY_API_V1, resource_kind=DeviceRegistryResourceKinds.ASSETENDPOINTPROFILE.value)
if not all_asset_endpoint_profiles:
fetch_asset_endpoint_profiles_warning_text = "Unable to fetch asset endpoint profiles in any namespaces."
check_manager.add_target(target_name=target_asset_endpoint_profiles)
check_manager.add_display(target_name=target_asset_endpoint_profiles, display=Padding(fetch_asset_endpoint_profiles_warning_text, (0, 0, 0, 8)))
check_manager.add_target_eval(
target_name=target_asset_endpoint_profiles,
status=CheckTaskStatus.skipped.value,
value=fetch_asset_endpoint_profiles_warning_text
)
return check_manager.as_dict(as_list)
for (namespace, asset_endpoint_profiles) in get_resources_grouped_by_namespace(all_asset_endpoint_profiles):
check_manager.add_target(target_name=target_asset_endpoint_profiles, namespace=namespace, conditions=endpoint_namespace_conditions)
check_manager.add_display(
target_name=target_asset_endpoint_profiles,
namespace=namespace,
display=Padding(
f"Asset Endpoint Profiles in namespace {{[purple]{namespace}[/purple]}}",
(0, 0, 0, 8)
)
)
asset_endpoint_profiles: List[dict] = list(asset_endpoint_profiles)
added_transport_authentication_conditions = False
added_user_authentication_conditions = False
for asset_endpoint_profile in asset_endpoint_profiles:
asset_endpoint_profile_name = asset_endpoint_profile["metadata"]["name"]
padding = 10
asset_endpoint_profile_status_text = (
f"- Profile {{[bright_blue]{asset_endpoint_profile_name}[/bright_blue]}} detected."
)
check_manager.add_display(
target_name=target_asset_endpoint_profiles,
namespace=namespace,
display=Padding(asset_endpoint_profile_status_text, (0, 0, 0, padding)),
)
spec_padding = padding + PADDING_SIZE
asset_endpoint_profile_spec = asset_endpoint_profile["spec"]
endpoint_profile_uuid = asset_endpoint_profile_spec.get("uuid", "")
endpoint_profile_uuid_value = {"spec.uuid": endpoint_profile_uuid}
endpoint_profile_uuid_status = CheckTaskStatus.success.value
if endpoint_profile_uuid:
endpoint_profile_uuid_text = (
f"Uuid: {{[bright_blue]{endpoint_profile_uuid}[/bright_blue]}} [green]detected[/green]."
)
else:
endpoint_profile_uuid_text = (
"Uuid [red]not detected[/red]."
)
endpoint_profile_uuid_status = CheckTaskStatus.error.value
add_display_and_eval(
check_manager=check_manager,
target_name=target_asset_endpoint_profiles,
display_text=endpoint_profile_uuid_text,
eval_status=endpoint_profile_uuid_status,
eval_value=endpoint_profile_uuid_value,
resource_name=asset_endpoint_profile_name,
namespace=namespace,
padding=(0, 0, 0, spec_padding)
)
# transportAuthentication
transport_authentication = asset_endpoint_profile_spec.get("transportAuthentication", {})
if transport_authentication:
if not added_transport_authentication_conditions:
check_manager.add_target_conditions(
target_name=target_asset_endpoint_profiles,
namespace=namespace,
conditions=[
"spec.transportAuthentication.ownCertificates"
]
)
added_transport_authentication_conditions = True
check_manager.add_display(
target_name=target_asset_endpoint_profiles,
namespace=namespace,
display=Padding(
"Transport authentication:",
(0, 0, 0, spec_padding)
)
)
transport_authentication_own_certificates = transport_authentication.get("ownCertificates", None)
transport_authentication_own_certificates_value = {"spec.transportAuthentication.ownCertificates": transport_authentication_own_certificates}
transport_authentication_own_certificates_status = CheckTaskStatus.success.value
transport_authentication_padding = spec_padding + PADDING_SIZE
if transport_authentication_own_certificates is None:
transport_authentication_own_certificates_text = (
"Own certificates [red]not detected[/red]."
)
transport_authentication_own_certificates_status = CheckTaskStatus.error.value
else:
transport_authentication_own_certificates_text = (
f"Own certificates: {len(transport_authentication_own_certificates)} detected."
)
add_display_and_eval(
check_manager=check_manager,
target_name=target_asset_endpoint_profiles,
display_text=transport_authentication_own_certificates_text,
eval_status=transport_authentication_own_certificates_status,
eval_value=transport_authentication_own_certificates_value,
resource_name=asset_endpoint_profile_name,
namespace=namespace,
padding=(0, 0, 0, transport_authentication_padding)
)
if detail_level > ResourceOutputDetailLevel.detail.value and transport_authentication_own_certificates:
process_list_resource(
check_manager=check_manager,
target_name=target_asset_endpoint_profiles,
resource=transport_authentication_own_certificates,
namespace=namespace,
padding=transport_authentication_padding + PADDING_SIZE
)
# userAuthentication
user_authentication = asset_endpoint_profile_spec.get("userAuthentication", {})
if user_authentication:
if not added_user_authentication_conditions:
check_manager.add_target_conditions(
target_name=target_asset_endpoint_profiles,
namespace=namespace,
conditions=[
"spec.userAuthentication.mode",
"spec.userAuthentication.x509Credentials.certificateReference",
"spec.userAuthentication.usernamePasswordCredentials.usernameReference",
"spec.userAuthentication.usernamePasswordCredentials.passwordReference"
]
)
added_user_authentication_conditions = True
check_manager.add_display(
target_name=target_asset_endpoint_profiles,
namespace=namespace,
display=Padding(
"User authentication:",
(0, 0, 0, spec_padding)
)
)
# check required mode
user_authentication_mode = user_authentication.get("mode", "")
user_authentication_mode_value = {"spec.userAuthentication.mode": user_authentication_mode}
user_authentication_mode_status = CheckTaskStatus.success.value
user_authentication_padding = spec_padding + PADDING_SIZE
if user_authentication_mode:
user_authentication_mode_text = (
f"User authentication mode: {{[bright_blue]{user_authentication_mode}[/bright_blue]}} [green]detected[/green]."
)
else:
user_authentication_mode_text = (
"User authentication mode [red]not detected[/red]."
)
user_authentication_mode_status = CheckTaskStatus.error.value
add_display_and_eval(
check_manager=check_manager,
target_name=target_asset_endpoint_profiles,
display_text=user_authentication_mode_text,
eval_status=user_authentication_mode_status,
eval_value=user_authentication_mode_value,
resource_name=asset_endpoint_profile_name,
namespace=namespace,
padding=(0, 0, 0, user_authentication_padding)
)
if user_authentication_mode == "Certificate":
# check x509Credentials
user_authentication_x509_credentials = user_authentication.get("x509Credentials", {})
certificate_reference = user_authentication_x509_credentials.get("certificateReference", "")
user_authentication_x509_credentials_value = {"spec.userAuthentication.x509Credentials.certificateReference": certificate_reference}
user_authentication_x509_credentials_status = CheckTaskStatus.success.value
if certificate_reference:
user_authentication_x509_credentials_text = (
f"Certificate reference: {{[bright_blue]{certificate_reference}[/bright_blue]}} [green]detected[/green]."
)
else:
user_authentication_x509_credentials_text = (
"Certificate reference [red]not detected[/red]."
)
user_authentication_x509_credentials_status = CheckTaskStatus.error.value
add_display_and_eval(
check_manager=check_manager,
target_name=target_asset_endpoint_profiles,
display_text=user_authentication_x509_credentials_text,
eval_status=user_authentication_x509_credentials_status,
eval_value=user_authentication_x509_credentials_value,
resource_name=asset_endpoint_profile_name,
namespace=namespace,
padding=(0, 0, 0, user_authentication_padding + PADDING_SIZE)
)
elif user_authentication_mode == "UsernamePassword":
# check usernamePasswordCredentials
user_authentication_username_password_credentials = user_authentication.get("usernamePasswordCredentials", {})
username_reference = user_authentication_username_password_credentials.get("usernameReference", "")
password_reference = user_authentication_username_password_credentials.get("passwordReference", "")
user_authentication_username_password_credentials_value = {
"spec.userAuthentication.usernamePasswordCredentials.usernameReference": username_reference,
"spec.userAuthentication.usernamePasswordCredentials.passwordReference": password_reference
}
user_authentication_username_password_credentials_status = CheckTaskStatus.success.value
if username_reference and password_reference:
user_authentication_username_password_credentials_text = (
f"Username reference: {{[bright_blue]{username_reference}[/bright_blue]}} [green]detected[/green].\n"
f"Password reference: {{[bright_blue]{password_reference}[/bright_blue]}} [green]detected[/green]."
)
else:
user_authentication_username_password_credentials_text = (
"Username reference or password reference [red]not detected[/red]."
)
user_authentication_username_password_credentials_status = CheckTaskStatus.error.value
add_display_and_eval(
check_manager=check_manager,
target_name=target_asset_endpoint_profiles,
display_text=user_authentication_username_password_credentials_text,
eval_status=user_authentication_username_password_credentials_status,
eval_value=user_authentication_username_password_credentials_value,
resource_name=asset_endpoint_profile_name,
namespace=namespace,
padding=(0, 0, 0, user_authentication_padding + PADDING_SIZE)
)
if detail_level > ResourceOutputDetailLevel.summary.value:
process_resource_properties(
check_manager=check_manager,
detail_level=detail_level,
target_name=target_asset_endpoint_profiles,
prop_value=asset_endpoint_profile_spec,
properties=[
("additionalConfiguration", "Additional configuration", True),
("targetAddress", "Target address", False),
],
namespace=namespace,
padding=(0, 0, 0, spec_padding)
)
return check_manager.as_dict(as_list)
def _process_asset_status(
check_manager: CheckManager,
added_status_conditions: bool,
target_assets: str,
asset_name: str,
status: dict,
padding: int,
namespace: str,
):
if not added_status_conditions:
check_manager.add_target_conditions(
target_name=target_assets,
namespace=namespace,
conditions=["spec.status"]
)
added_status_conditions = True
status_value = {"spec.status": str(status)}
status_status = CheckTaskStatus.success.value
errors = status.get("errors", [])
if errors:
status_text = (
"Asset status [red]error[/red]."
)
for error in errors:
error_code = error.get("code", "")
message = error.get("message", "")
error_text = (
f"- Asset status error code: [red]{error_code}[/red]. Message: {message}"
)
check_manager.add_display(
target_name=target_assets,
namespace=namespace,
display=Padding(error_text, (0, 0, 0, padding + PADDING_SIZE)),
)
status_status = CheckTaskStatus.error.value
else:
status_text = (
"Asset status [green]OK[/green]."
)
add_display_and_eval(
check_manager=check_manager,
target_name=target_assets,
display_text=status_text,
eval_status=status_status,
eval_value=status_value,
resource_name=asset_name,
namespace=namespace,
padding=(0, 0, 0, padding)
)