azext_edge/edge/providers/check/mq.py (1,385 lines of code) (raw):
# coding=utf-8
# ----------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License file in the project root for license information.
# ----------------------------------------------------------------------------------------------
from typing import Any, Dict, List, Optional
from azext_edge.edge.providers.check.base.display import add_display_and_eval, colorize_string
from .base import (
CheckManager,
check_post_deployment,
evaluate_pod_health,
get_resources_by_name,
get_resources_grouped_by_namespace,
process_dict_resource,
process_resource_properties,
validate_one_of_conditions,
process_custom_resource_status,
get_valid_resource_names,
validate_runtime_resource_ref,
)
from rich.console import NewLine
from rich.padding import Padding
from ...common import (
AIO_BROKER_DIAGNOSTICS_SERVICE,
CheckTaskStatus,
)
from .common import (
AIO_BROKER_DIAGNOSTICS_PROBE_PREFIX,
AIO_BROKER_FRONTEND_PREFIX,
AIO_BROKER_BACKEND_PREFIX,
AIO_BROKER_AUTH_PREFIX,
AIO_BROKER_HEALTH_MANAGER,
AIO_BROKER_OPERATOR,
BROKER_DIAGNOSTICS_PROPERTIES,
DEFAULT_PADDING,
CheckResult,
ResourceOutputDetailLevel,
ValidationResourceType,
)
from ...providers.edge_api import MQ_ACTIVE_API, MqResourceKinds
from ..support.mq import MQ_NAME_LABEL
from ..base import get_namespaced_pods_by_prefix, get_namespaced_service
def check_mq_deployment(
as_list: bool = False,
detail_level: int = ResourceOutputDetailLevel.summary.value,
resource_kinds: List[str] = None,
resource_name: str = None,
) -> List[dict]:
evaluate_funcs = {
MqResourceKinds.BROKER: evaluate_brokers,
MqResourceKinds.BROKER_LISTENER: evaluate_broker_listeners,
MqResourceKinds.BROKER_AUTHENTICATION: evaluate_broker_authentications,
MqResourceKinds.BROKER_AUTHORIZATION: evaluate_broker_authorizations,
}
return check_post_deployment(
api_info=MQ_ACTIVE_API,
check_name="enumerateBrokerApi",
check_desc="Enumerate MQTT Broker API resources",
evaluate_funcs=evaluate_funcs,
as_list=as_list,
detail_level=detail_level,
resource_kinds=resource_kinds,
resource_name=resource_name,
)
def evaluate_broker_listeners(
as_list: bool = False,
detail_level: int = ResourceOutputDetailLevel.summary.value,
resource_name: str = None,
) -> Dict[str, Any]:
check_manager = CheckManager(
check_name="evalBrokerListeners",
check_desc="Evaluate MQTT Broker Listeners",
)
target_listeners = "brokerlisteners.mqttbroker.iotoperations.azure.com"
listener_conditions = [
"len(brokerlisteners)>=1",
"spec",
"spec.serviceName",
]
all_listeners = get_resources_by_name(
api_info=MQ_ACTIVE_API,
kind=MqResourceKinds.BROKER_LISTENER,
resource_name=resource_name,
)
if not all_listeners:
status = CheckTaskStatus.skipped.value if resource_name else CheckTaskStatus.error.value
fetch_listeners_error_text = f"Unable to fetch {MqResourceKinds.BROKER_LISTENER.value}s in any namespace."
check_manager.add_target(target_name=target_listeners)
check_manager.add_target_eval(
target_name=target_listeners,
status=status,
value=fetch_listeners_error_text,
)
check_manager.add_display(
target_name=target_listeners,
display=Padding(fetch_listeners_error_text, (0, 0, 0, DEFAULT_PADDING)),
)
return check_manager.as_dict(as_list)
for namespace, listeners in get_resources_grouped_by_namespace(all_listeners):
check_manager.add_target(
target_name=target_listeners,
namespace=namespace,
conditions=listener_conditions,
)
check_manager.add_display(
target_name=target_listeners,
namespace=namespace,
display=Padding(
f"Broker Listeners in namespace {{[purple]{namespace}[/purple]}}", (0, 0, 0, DEFAULT_PADDING)
),
)
listeners = list(listeners)
listeners_count = len(listeners)
listener_count_desc = f"- Expecting {colorize_string('>=1')} broker listeners per namespace. "
listeners_eval_status = CheckTaskStatus.success.value
if listeners_count >= 1:
listeners_count_color = "green"
else:
listeners_count_color = "yellow"
check_manager.set_target_status(
target_name=target_listeners, namespace=namespace, status=CheckTaskStatus.warning.value
)
listener_count_desc += f"{colorize_string(color=listeners_count_color, value=f'Detected {listeners_count}')}."
check_manager.add_display(
target_name=target_listeners,
namespace=namespace,
display=Padding(listener_count_desc, (0, 0, 0, DEFAULT_PADDING)),
)
processed_services = {}
added_broker_ref_condition = False
for listener in listeners:
auth_metadata = listener["metadata"]
namespace: str = namespace or listener["metadata"]["namespace"]
listener_name: str = listener["metadata"]["name"]
listener_spec = listener["spec"]
listener_spec_service_name: str = listener_spec["serviceName"]
listener_status_state = listener.get("status", {})
listener_eval_value = {}
listener_eval_value["spec"] = listener_spec
# check broker reference
listener_desc = f"\n- Broker Listener {{{colorize_string(listener_name)}}}."
broker_ref = auth_metadata.get("ownerReferences", [])
_evaluate_broker_reference(
check_manager=check_manager,
owner_reference=broker_ref,
target_name=target_listeners,
namespace=namespace,
resource_name=listener_name,
added_condition=added_broker_ref_condition,
display_text=listener_desc,
)
listener_properties_padding = DEFAULT_PADDING + 4
if listener_status_state:
process_custom_resource_status(
check_manager=check_manager,
status=listener_status_state,
target_name=target_listeners,
namespace=namespace,
resource_name=listener_name,
padding=listener_properties_padding,
detail_level=detail_level,
)
ports = listener_spec.get("ports", [])
for port in ports:
tls = port.get("tls", {})
authn = port.get("authenticationRef", {})
authz = port.get("authorizationRef", {})
if detail_level != ResourceOutputDetailLevel.summary.value:
for label, val in [
("Port", "port"),
("Node Port", "nodePort"),
]:
val = port.get(val)
if val:
check_manager.add_display(
target_name=target_listeners,
namespace=namespace,
display=Padding(
f"{label}: {colorize_string(val)}",
(0, 0, 0, 12),
),
)
if authn:
authn_condition = "spec.ports[*].authenticationRef"
valid_authns = get_valid_resource_names(
api=MQ_ACTIVE_API,
kind=MqResourceKinds.BROKER_AUTHENTICATION.value,
namespace=namespace,
)
check_manager.add_target_conditions(
target_name=target_listeners,
namespace=namespace,
conditions=[authn_condition],
)
authn_eval_value = {"spec.ports[*].authenticationRef": authn}
is_authn_valid = authn in valid_authns
authn_validity = "valid" if is_authn_valid else "invalid"
authn_color = "green" if is_authn_valid else "red"
authn_display = f"Authentication reference: {{{colorize_string(authn)}}} is {colorize_string(color=authn_color, value=authn_validity)}."
authn_eval_status = (
CheckTaskStatus.error.value if authn_validity == "invalid" else CheckTaskStatus.success.value
)
if (
detail_level > ResourceOutputDetailLevel.summary.value
or authn_eval_status == CheckTaskStatus.error.value
):
check_manager.add_display(
target_name=target_listeners,
namespace=namespace,
display=Padding(authn_display, (0, 0, 0, 12)),
)
check_manager.add_target_eval(
target_name=target_listeners,
namespace=namespace,
status=authn_eval_status,
value=authn_eval_value,
resource_name=listener_name,
)
if authz:
authz_condition = "spec.ports[*].authorizationRef"
valid_authzs = get_valid_resource_names(
api=MQ_ACTIVE_API,
kind=MqResourceKinds.BROKER_AUTHORIZATION.value,
namespace=namespace,
)
check_manager.add_target_conditions(
target_name=target_listeners,
namespace=namespace,
conditions=[authz_condition],
)
authz_eval_value = {"spec.ports[*].authorizationRef": authz}
is_authz_valid = authz in valid_authzs
authz_validity = "valid" if is_authz_valid else "invalid"
authz_color = "green" if is_authz_valid else "red"
authz_display = (
f"Authorization reference: {{{colorize_string(authz)}}} is "
f"{colorize_string(color=authz_color, value=authz_validity)}."
)
authz_eval_status = (
CheckTaskStatus.success.value if authz_validity == "valid" else CheckTaskStatus.error.value
)
if (
detail_level > ResourceOutputDetailLevel.summary.value
or authz_eval_status == CheckTaskStatus.error.value
):
check_manager.add_display(
target_name=target_listeners,
namespace=namespace,
display=Padding(authz_display, (0, 0, 0, 12)),
)
check_manager.add_target_eval(
target_name=target_listeners,
namespace=namespace,
status=authz_eval_status,
value=authz_eval_value,
resource_name=listener_name,
)
if tls:
# "certManagerCertificateSpec" and "manual" are mutually exclusive
cert_spec = tls.get("certManagerCertificateSpec", {})
cert_spec_condition = "spec.ports[*].tls.certManagerCertificateSpec"
manual = tls.get("manual", {})
manual_condition = "spec.ports[*].tls.manual"
tls_eval_value = {
cert_spec_condition: cert_spec,
manual_condition: manual,
}
validate_one_of_conditions(
conditions=[
(cert_spec_condition, cert_spec),
(manual_condition, manual),
],
check_manager=check_manager,
eval_value=tls_eval_value,
namespace=namespace,
target_name=target_listeners,
resource_name=listener_name,
padding=listener_properties_padding,
)
if detail_level == ResourceOutputDetailLevel.verbose.value:
check_manager.add_display(
target_name=target_listeners,
namespace=namespace,
display=Padding("TLS:", (0, 0, 0, listener_properties_padding)),
)
# TODO - add check for refs
for prop_name, prop_value in {
"Cert Manager certificate spec": cert_spec,
"Manual": manual,
}.items():
if prop_value:
process_dict_resource(
check_manager=check_manager,
target_name=target_listeners,
resource=prop_value,
namespace=namespace,
padding=listener_properties_padding + 2,
prop_name=prop_name,
)
if listener_spec_service_name not in processed_services:
_evaluate_listener_service(
check_manager=check_manager,
listener_spec=listener_spec,
processed_services=processed_services,
target_listeners=target_listeners,
namespace=namespace,
detail_level=detail_level,
)
check_manager.add_target_eval(
target_name=target_listeners,
namespace=namespace,
status=listeners_eval_status,
value=listener_eval_value,
resource_name=listener_name,
)
# remove duplicate conditions
# TODO - add remove duplicates on insertion under checkmanager itself
listener_conditions = check_manager.targets.get(target_listeners, {}).get(namespace, {}).get("conditions", [])
listener_conditions = list(set(listener_conditions))
check_manager.set_target_conditions(
target_name=target_listeners, namespace=namespace, conditions=listener_conditions
)
return check_manager.as_dict(as_list)
def evaluate_brokers(
as_list: bool = False,
detail_level: int = ResourceOutputDetailLevel.summary.value,
resource_name: str = None,
) -> Dict[str, Any]:
check_manager = CheckManager(check_name="evalBrokers", check_desc="Evaluate MQTT Brokers")
target_brokers = "brokers.mqttbroker.iotoperations.azure.com"
broker_conditions = ["len(brokers)==1", "spec.mode"]
all_brokers: dict = get_resources_by_name(
api_info=MQ_ACTIVE_API,
kind=MqResourceKinds.BROKER,
resource_name=resource_name,
)
if not all_brokers:
status = CheckTaskStatus.skipped.value if resource_name else CheckTaskStatus.error.value
fetch_brokers_error_text = f"Unable to fetch {MqResourceKinds.BROKER.value}s in any namespace."
check_manager.add_target(target_name=target_brokers)
check_manager.add_target_eval(
target_name=target_brokers,
status=status,
value=fetch_brokers_error_text,
)
check_manager.add_display(
target_name=target_brokers,
display=Padding(fetch_brokers_error_text, (0, 0, 0, DEFAULT_PADDING)),
)
return check_manager.as_dict(as_list)
for namespace, brokers in get_resources_grouped_by_namespace(all_brokers):
check_manager.add_target(target_name=target_brokers, namespace=namespace, conditions=broker_conditions)
check_manager.add_display(
target_name=target_brokers,
namespace=namespace,
display=Padding(f"MQTT Brokers in namespace {{[purple]{namespace}[/purple]}}", (0, 0, 0, DEFAULT_PADDING)),
)
brokers = list(brokers)
brokers_count = len(brokers)
brokers_count_text = f"- Expecting {colorize_string('1')} broker resource per namespace. "
broker_eval_status = CheckTaskStatus.success.value
broker_color = "green" if brokers_count == 1 else "red"
brokers_count_text += f"{colorize_string(color=broker_color, value=f'Detected {brokers_count}')}."
if brokers_count != 1:
check_manager.set_target_status(
target_name=target_brokers, namespace=namespace, status=CheckTaskStatus.error.value
)
check_manager.add_display(
target_name=target_brokers,
namespace=namespace,
display=Padding(brokers_count_text, (0, 0, 0, DEFAULT_PADDING)),
)
added_distributed_conditions = False
added_diagnostics_conditions = False
for b in brokers:
broker_name = b["metadata"]["name"]
broker_spec: dict = b["spec"]
broker_diagnostics = broker_spec["diagnostics"]
broker_status_state = b.get("status", {})
target_broker_text = f"\n- Broker {{{colorize_string(broker_name)}}}"
check_manager.add_display(
target_name=target_brokers,
namespace=namespace,
display=Padding(target_broker_text, (0, 0, 0, DEFAULT_PADDING)),
)
broker_properties_padding = DEFAULT_PADDING + 4
if broker_status_state:
process_custom_resource_status(
check_manager=check_manager,
status=broker_status_state,
target_name=target_brokers,
namespace=namespace,
resource_name=broker_name,
padding=broker_properties_padding,
detail_level=detail_level,
)
broker_eval_value = {}
if not added_distributed_conditions:
# TODO - conditional evaluations
broker_conditions.append("spec.cardinality")
broker_conditions.append("spec.cardinality.backendChain.partitions>=1")
broker_conditions.append("spec.cardinality.backendChain.redundancyFactor>=1")
broker_conditions.append("spec.cardinality.backendChain.workers>=1")
broker_conditions.append("spec.cardinality.frontend.replicas>=1")
added_distributed_conditions = True
check_manager.set_target_conditions(
target_name=target_brokers, namespace=namespace, conditions=broker_conditions
)
broker_cardinality: dict = broker_spec.get("cardinality")
broker_eval_value["spec.cardinality"] = broker_cardinality
if not broker_cardinality:
broker_eval_status = CheckTaskStatus.error.value
# show cardinality display (regardless of detail level) if it's missing
check_manager.add_display(
target_name=target_brokers,
namespace=namespace,
display=Padding("\nCardinality", (0, 0, 0, 12)),
)
check_manager.add_display(
target_name=target_brokers,
namespace=namespace,
display=Padding(
f"cardinality {colorize_string(color='red', value='not detected')}.",
(0, 0, 0, 16),
),
)
else:
broker_cardinality_eval_status = _evaluate_broker_cardinality(
check_manager=check_manager,
broker_cardinality=broker_cardinality,
target_brokers=target_brokers,
namespace=namespace,
padding=broker_properties_padding,
detail_level=detail_level,
)
if broker_cardinality_eval_status == CheckTaskStatus.error.value:
broker_eval_status = CheckTaskStatus.error.value
elif broker_cardinality_eval_status == CheckTaskStatus.warning.value:
broker_eval_status = (
CheckTaskStatus.warning.value
if broker_eval_status != CheckTaskStatus.error.value
else broker_eval_status
)
diagnostic_detail_padding = (0, 0, 0, broker_properties_padding + 4)
if not added_diagnostics_conditions:
check_manager.add_target_conditions(
target_name=target_brokers,
conditions=["spec.diagnostics"],
namespace=namespace,
)
added_diagnostics_conditions = True
broker_eval_value["spec.diagnostics"] = broker_diagnostics
if broker_diagnostics:
if detail_level != ResourceOutputDetailLevel.summary.value:
check_manager.add_display(
target_name=target_brokers,
namespace=namespace,
display=Padding("\nBroker Diagnostics", (0, 0, 0, 12)),
)
if detail_level == ResourceOutputDetailLevel.detail.value:
process_resource_properties(
check_manager=check_manager,
detail_level=detail_level,
target_name=target_brokers,
prop_value=broker_diagnostics,
properties=BROKER_DIAGNOSTICS_PROPERTIES,
namespace=namespace,
padding=diagnostic_detail_padding,
)
else:
process_dict_resource(
check_manager=check_manager,
target_name=target_brokers,
resource=broker_diagnostics,
namespace=namespace,
padding=diagnostic_detail_padding[3],
)
# show broker diagnostics error regardless of detail_level
else:
broker_eval_status = CheckTaskStatus.warning.value
check_manager.add_display(
target_name=target_brokers,
namespace=namespace,
display=Padding("\nBroker Diagnostics", (0, 0, 0, 12)),
)
check_manager.add_display(
target_name=target_brokers,
namespace=namespace,
display=Padding(
colorize_string(color="yellow", value="Unable to fetch broker diagnostics."),
diagnostic_detail_padding,
),
)
check_manager.add_target_eval(
target_name=target_brokers,
namespace=namespace,
status=broker_eval_status,
value=broker_eval_value,
resource_name=broker_name,
)
_evaluate_broker_diagnostics_service(
check_manager=check_manager,
target_brokers=target_brokers,
namespace=namespace,
detail_level=detail_level,
)
if brokers_count > 0:
check_manager.add_display(
target_name=target_brokers,
namespace=namespace,
display=Padding(
"\nRuntime Health",
(0, 0, 0, DEFAULT_PADDING),
),
)
pods: List[dict] = []
for prefix in [
AIO_BROKER_DIAGNOSTICS_PROBE_PREFIX,
AIO_BROKER_FRONTEND_PREFIX,
AIO_BROKER_BACKEND_PREFIX,
AIO_BROKER_AUTH_PREFIX,
AIO_BROKER_HEALTH_MANAGER,
AIO_BROKER_DIAGNOSTICS_SERVICE,
AIO_BROKER_OPERATOR,
# AIO_BROKER_FLUENT_BIT,
# TODO: Fluent Bit is deployed to all nodes and stays in a pending state until an
# AIO workload is running on the node. For clusters with many nodes, usually
# some instances of Fluent Bit will be in a pending state. This is expected.
]:
prefixed_pods = get_namespaced_pods_by_prefix(
prefix=prefix,
namespace=namespace,
label_selector=MQ_NAME_LABEL,
)
if not prefixed_pods:
add_display_and_eval(
check_manager=check_manager,
target_name=target_brokers,
display_text=f"{prefix}* {colorize_string(color='yellow', value='not detected')}.",
eval_status=CheckTaskStatus.warning.value,
eval_value=None,
resource_name=prefix,
namespace=namespace,
padding=(0, 0, 0, broker_properties_padding),
)
else:
pods.extend(
get_namespaced_pods_by_prefix(
prefix=prefix,
namespace="",
label_selector=MQ_NAME_LABEL,
)
)
evaluate_pod_health(
check_manager=check_manager,
target=target_brokers,
namespace=namespace,
padding=broker_properties_padding,
pods=pods,
detail_level=detail_level,
)
return check_manager.as_dict(as_list)
def evaluate_broker_authentications(
as_list: bool = False,
detail_level: int = ResourceOutputDetailLevel.summary.value,
resource_name: str = None,
) -> Dict[str, Any]:
check_manager = CheckManager(
check_name="evalBrokerAuthentications",
check_desc="Evaluate MQTT Broker Authentications",
)
target_authentications = "brokerauthentications.mqttbroker.iotoperations.azure.com"
auth_conditions = ["len(spec.authenticationMethods)"]
all_authentications = get_resources_by_name(
api_info=MQ_ACTIVE_API,
kind=MqResourceKinds.BROKER_AUTHENTICATION,
resource_name=resource_name,
)
if not all_authentications:
status = CheckTaskStatus.skipped.value if resource_name else CheckTaskStatus.error.value
fetch_authentications_error_text = (
f"Unable to fetch {MqResourceKinds.BROKER_AUTHENTICATION.value}s in any namespace."
)
check_manager.add_target(target_name=target_authentications)
check_manager.add_target_eval(
target_name=target_authentications,
status=status,
value=fetch_authentications_error_text,
)
check_manager.add_display(
target_name=target_authentications,
display=Padding(fetch_authentications_error_text, (0, 0, 0, DEFAULT_PADDING)),
)
return check_manager.as_dict(as_list)
for namespace, authentications in get_resources_grouped_by_namespace(all_authentications):
check_manager.add_target(
target_name=target_authentications,
namespace=namespace,
conditions=auth_conditions,
)
check_manager.add_display(
target_name=target_authentications,
namespace=namespace,
display=Padding(
f"Broker Authentications in namespace {{{colorize_string(color='purple', value=namespace)}}}",
(0, 0, 0, DEFAULT_PADDING),
),
)
authentications = list(authentications)
added_broker_ref_condition = False
for auth in authentications:
auth_metadata = auth["metadata"]
auth_name = auth_metadata["name"]
# store results for check that will used to display later
sub_check_results: List[CheckResult] = []
# check broker reference
broker_ref = auth_metadata.get("ownerReferences", [])
auth_desc = f"\n- Broker Authentication {{{colorize_string(auth_name)}}}."
_evaluate_broker_reference(
check_manager=check_manager,
owner_reference=broker_ref,
target_name=target_authentications,
namespace=namespace,
resource_name=auth_name,
added_condition=added_broker_ref_condition,
display_text=auth_desc,
)
authn_properties_padding = DEFAULT_PADDING + 4
# status
status = auth.get("status", {})
if status:
process_custom_resource_status(
check_manager=check_manager,
status=status,
target_name=target_authentications,
namespace=namespace,
resource_name=auth_name,
padding=authn_properties_padding,
detail_level=detail_level,
)
auth_spec = auth.get("spec", {})
# check authentication methods
auth_methods = auth_spec.get("authenticationMethods", [])
auth_methods_desc = f"Expecting {colorize_string('>=1')} authentication methods. "
auth_methods_eval_status = CheckTaskStatus.success.value
if len(auth_methods) >= 1:
auth_methods_desc = (
auth_methods_desc + f"Detected {colorize_string(color='green', value=len(auth_methods))}."
)
else:
auth_methods_desc = auth_methods_desc + f"{colorize_string(color='red', value='Not Detected')}."
auth_methods_eval_status = CheckTaskStatus.error.value
sub_check_results.append(
CheckResult(
display=Padding(auth_methods_desc, (0, 0, 0, 12)),
eval_status=auth_methods_eval_status,
)
)
check_manager.add_target_eval(
target_name=target_authentications,
namespace=namespace,
status=auth_methods_eval_status,
value={"len(spec.authenticationMethods)": len(auth_methods)},
resource_name=auth_name,
)
for method in auth_methods:
_check_authentication_method(
check_manager=check_manager,
target_authentications=target_authentications,
namespace=namespace,
resource_name=auth_name,
method=method,
sub_check_results=sub_check_results,
detail_level=detail_level,
)
_display_sub_check_results(
check_manager=check_manager,
target_name=target_authentications,
namespace=namespace,
sub_check_results=sub_check_results,
parent_padding=DEFAULT_PADDING,
detail_level=detail_level,
)
return check_manager.as_dict(as_list)
def evaluate_broker_authorizations(
as_list: bool = False,
detail_level: int = ResourceOutputDetailLevel.summary.value,
resource_name: str = None,
) -> Dict[str, Any]:
check_manager = CheckManager(
check_name="evalBrokerAuthorizations",
check_desc="Evaluate MQTT Broker Authorizations",
)
target_authorizations = "brokerauthorizations.mqttbroker.iotoperations.azure.com"
authz_conditions = ["spec.authorizationPolicies"]
all_authorizations = get_resources_by_name(
api_info=MQ_ACTIVE_API,
kind=MqResourceKinds.BROKER_AUTHORIZATION,
resource_name=resource_name,
)
if not all_authorizations:
fetch_authorizations_error_text = (
f"Unable to fetch {MqResourceKinds.BROKER_AUTHORIZATION.value}s in any namespace."
)
check_manager.add_target(target_name=target_authorizations)
check_manager.add_target_eval(
target_name=target_authorizations,
status=CheckTaskStatus.skipped.value,
value=fetch_authorizations_error_text,
)
check_manager.add_display(
target_name=target_authorizations,
display=Padding(fetch_authorizations_error_text, (0, 0, 0, DEFAULT_PADDING)),
)
return check_manager.as_dict(as_list)
for namespace, authorizations in get_resources_grouped_by_namespace(all_authorizations):
check_manager.add_target(
target_name=target_authorizations,
namespace=namespace,
)
check_manager.add_display(
target_name=target_authorizations,
namespace=namespace,
display=Padding(
f"Broker Authorizations in namespace {{[purple]{namespace}[/purple]}}", (0, 0, 0, DEFAULT_PADDING)
),
)
authorizations = list(authorizations)
added_broker_ref_condition = False
for authz in authorizations:
authz_metadata = authz["metadata"]
authz_name = authz_metadata["name"]
# check broker reference
broker_ref = authz_metadata.get("ownerReferences", [])
authz_desc = f"\n- Broker Authorization {{{colorize_string(authz_name)}}}."
_evaluate_broker_reference(
check_manager=check_manager,
owner_reference=broker_ref,
target_name=target_authorizations,
namespace=namespace,
resource_name=authz_name,
added_condition=added_broker_ref_condition,
display_text=authz_desc,
)
authz_properties_padding = DEFAULT_PADDING + 4
# status
status = authz.get("status", {})
if status:
process_custom_resource_status(
check_manager=check_manager,
status=status,
target_name=target_authorizations,
namespace=namespace,
resource_name=authz_name,
padding=authz_properties_padding,
detail_level=detail_level,
)
authz_spec = authz.get("spec", {})
# check authorization policies
authz_policies = authz_spec.get("authorizationPolicies", {})
authz_policies_eval_status = CheckTaskStatus.success.value
if authz_policies:
authz_policies_desc = f"Authorization Policies {colorize_string(color='green', value='detected')}."
else:
authz_policies_desc = f"Authorization Policies {colorize_string(color='red', value='not detected')}."
authz_policies_eval_status = CheckTaskStatus.error.value
if (
detail_level != ResourceOutputDetailLevel.summary.value
or authz_policies_eval_status != CheckTaskStatus.success.value
):
check_manager.add_display(
target_name=target_authorizations,
namespace=namespace,
display=Padding(authz_policies_desc, (0, 0, 0, 12)),
)
check_manager.add_target_eval(
target_name=target_authorizations,
namespace=namespace,
status=authz_policies_eval_status,
value={"spec.authorizationPolicies": authz_policies},
resource_name=authz_name,
)
if authz_policies and detail_level == ResourceOutputDetailLevel.verbose.value:
process_dict_resource(
check_manager=check_manager,
target_name=target_authorizations,
resource=authz_policies,
namespace=namespace,
padding=authz_properties_padding + 2,
)
check_manager.add_target_conditions(
target_name=target_authorizations,
namespace=namespace,
conditions=authz_conditions,
)
return check_manager.as_dict(as_list)
def _evaluate_broker_reference(
check_manager: CheckManager,
owner_reference: dict,
target_name: str,
namespace: str,
resource_name: str,
added_condition: bool,
display_text: str,
padding: Optional[int] = DEFAULT_PADDING,
):
broker_reference = [ref for ref in owner_reference if ref.get("kind").lower() == MqResourceKinds.BROKER.value]
if not broker_reference:
# skip this check
return ""
# should only have one broker reference
broker_reference_name = broker_reference[0].get("name")
if not added_condition:
check_manager.add_target_conditions(
target_name=target_name,
namespace=namespace,
conditions=["valid(brokerRef)"],
)
added_condition = True
valid_broker_refs = get_valid_resource_names(api=MQ_ACTIVE_API, kind=MqResourceKinds.BROKER, namespace=namespace)
ref_eval_status = CheckTaskStatus.success.value
ref_eval_value = {}
if broker_reference_name not in valid_broker_refs:
ref_color = "red"
ref_eval_status = CheckTaskStatus.error.value
ref_eval_value["valid(spec.brokerRef)"] = False
else:
ref_color = "green"
ref_eval_value["valid(spec.brokerRef)"] = True
ref_display = (
f"Broker reference {{{colorize_string(broker_reference_name)}}} is "
f"{colorize_string(color=ref_color, value='Invalid' if ref_color == 'red' else 'Valid')}."
)
check_manager.add_target_eval(
target_name=target_name,
namespace=namespace,
status=ref_eval_status,
value=ref_eval_value,
resource_name=resource_name,
)
check_manager.add_display(
target_name=target_name,
namespace=namespace,
display=Padding(f"{display_text} {ref_display}", (0, 0, 0, padding)),
)
def _evaluate_listener_service(
check_manager: CheckManager,
listener_spec: dict,
processed_services: dict,
target_listeners: str,
namespace: str,
detail_level: int = ResourceOutputDetailLevel.summary.value,
) -> None:
listener_spec_service_name: str = listener_spec["serviceName"]
listener_spec_service_type: str = listener_spec["serviceType"]
target_listener_service = f"service/{listener_spec_service_name}"
listener_service_eval_status = CheckTaskStatus.success.value
check_manager.add_target(
target_name=target_listener_service,
namespace=namespace,
conditions=["listener_service"],
)
associated_service: dict = get_namespaced_service(
name=listener_spec_service_name, namespace=namespace, as_dict=True
)
processed_services[listener_spec_service_name] = True
if not associated_service:
listener_service_eval_status = CheckTaskStatus.warning.value
check_manager.add_display(
target_name=target_listeners,
namespace=namespace,
display=Padding(
f"\n{colorize_string(color='red', value='Unable')} to fetch service {{{colorize_string(color='red', value=listener_spec_service_name)}}}.",
(0, 0, 0, 12),
),
)
check_manager.add_target_eval(
target_name=target_listener_service,
namespace=namespace,
status=listener_service_eval_status,
value={"listener_service": "Unable to fetch service."},
resource_name=f"service/{listener_spec_service_name}",
)
else:
check_manager.add_target_eval(
target_name=target_listener_service,
namespace=namespace,
status=CheckTaskStatus.success.value,
value={"listener_service": target_listener_service},
resource_name=f"service/{listener_spec_service_name}",
)
check_manager.add_display(
target_name=target_listener_service,
namespace=namespace,
display=Padding(
f"Service {{{colorize_string(listener_spec_service_name)}}} of type {colorize_string(listener_spec_service_type)}",
(0, 0, 0, DEFAULT_PADDING),
),
)
if listener_spec_service_type.lower() == "loadbalancer":
check_manager.add_target_conditions(
target_name=target_listener_service,
namespace=namespace,
conditions=[
"status",
"len(status.loadBalancer.ingress[*].ip)>=1",
],
)
ingress_rules_desc = f"- Expecting {colorize_string('>=1')} ingress rule. "
service_status = associated_service.get("status", {})
load_balancer = service_status.get("loadBalancer", {})
ingress_rules: List[dict] = load_balancer.get("ingress", [])
if not ingress_rules:
listener_service_eval_status = CheckTaskStatus.warning.value
ingress_color = "red"
else:
ingress_color = "green"
ingress_count_colored = f"{colorize_string(color=ingress_color, value='Detected 0' if ingress_color == 'red' else str(len(ingress_rules)))}."
if detail_level != ResourceOutputDetailLevel.summary.value:
check_manager.add_display(
target_name=target_listener_service,
namespace=namespace,
display=Padding(
ingress_rules_desc + ingress_count_colored,
(0, 0, 0, 12),
),
)
if ingress_rules:
check_manager.add_display(
target_name=target_listener_service,
namespace=namespace,
display=Padding("\nIngress", (0, 0, 0, 12)),
)
for ingress in ingress_rules:
ip = ingress.get("ip")
if ip:
if detail_level != ResourceOutputDetailLevel.summary.value:
rule_desc = f"- ip: {colorize_string(color='green', value=ip)}"
check_manager.add_display(
target_name=target_listener_service,
namespace=namespace,
display=Padding(rule_desc, (0, 0, 0, 16)),
)
else:
listener_service_eval_status = CheckTaskStatus.warning.value
check_manager.add_target_eval(
target_name=target_listener_service,
namespace=namespace,
status=listener_service_eval_status,
value={"status": service_status},
)
elif listener_spec_service_type.lower() == "clusterip":
check_manager.add_target_conditions(
target_name=target_listener_service,
namespace=namespace,
conditions=["spec.clusterIP"],
)
cluster_ip = associated_service.get("spec", {}).get("clusterIP")
cluster_ip_desc = "Cluster IP: {}"
if not cluster_ip:
listener_service_eval_status = CheckTaskStatus.warning.value
cluster_ip_desc = cluster_ip_desc.format(colorize_string(color="yellow", value="Undetermined"))
else:
cluster_ip_desc = cluster_ip_desc.format(colorize_string(cluster_ip))
if detail_level != ResourceOutputDetailLevel.summary.value:
check_manager.add_display(
target_name=target_listener_service,
namespace=namespace,
display=Padding(cluster_ip_desc, (0, 0, 0, 12)),
)
check_manager.add_target_eval(
target_name=target_listener_service,
namespace=namespace,
status=listener_service_eval_status,
value={"spec.clusterIP": cluster_ip},
)
elif listener_spec_service_type.lower() == "nodeport":
pass
def _evaluate_broker_diagnostics_service(
check_manager: CheckManager,
target_brokers: str,
namespace: str,
detail_level: int = ResourceOutputDetailLevel.summary.value,
) -> None:
diagnostics_service = get_namespaced_service(
name=AIO_BROKER_DIAGNOSTICS_SERVICE, namespace=namespace, as_dict=True
)
if not diagnostics_service:
check_manager.add_target_eval(
target_name=target_brokers,
namespace=namespace,
status=CheckTaskStatus.error.value,
value=f"service/{AIO_BROKER_DIAGNOSTICS_SERVICE} not found in namespace {namespace}",
resource_name=f"service/{AIO_BROKER_DIAGNOSTICS_SERVICE}",
)
diag_service_desc_suffix = colorize_string(color="red", value="not detected")
diag_service_desc = (
f"Diagnostics Service {{{colorize_string(AIO_BROKER_DIAGNOSTICS_SERVICE)}}} {diag_service_desc_suffix}."
)
check_manager.add_display(
target_name=target_brokers,
namespace=namespace,
display=Padding(
diag_service_desc,
(0, 0, 0, 12),
),
)
else:
clusterIP = diagnostics_service.get("spec", {}).get("clusterIP")
ports: List[dict] = diagnostics_service.get("spec", {}).get("ports", [])
check_manager.add_target_eval(
target_name=target_brokers,
namespace=namespace,
status=CheckTaskStatus.success.value,
value={"spec": {"clusterIP": clusterIP, "ports": ports}},
resource_name=f"service/{AIO_BROKER_DIAGNOSTICS_SERVICE}",
)
diag_service_desc_suffix = colorize_string(color="green", value="detected")
diag_service_desc = (
f"\nDiagnostics Service {{{colorize_string(AIO_BROKER_DIAGNOSTICS_SERVICE)}}} {diag_service_desc_suffix}."
)
check_manager.add_display(
target_name=target_brokers,
namespace=namespace,
display=Padding(
diag_service_desc,
(0, 0, 0, 12),
),
)
if ports and detail_level != ResourceOutputDetailLevel.summary.value:
for port in ports:
check_manager.add_display(
target_name=target_brokers,
namespace=namespace,
display=Padding(
f"{colorize_string(port.get('name'))} "
f"port {colorize_string(port.get('port'))} "
f"protocol {colorize_string(port.get('protocol'))}",
(0, 0, 0, 16),
),
)
check_manager.add_display(target_name=target_brokers, namespace=namespace, display=NewLine())
def _display_sub_check_results(
check_manager: CheckManager,
target_name: str,
namespace: str,
sub_check_results: List[CheckResult],
parent_padding: int,
detail_level: int = ResourceOutputDetailLevel.summary.value,
) -> None:
errors_displays = []
for result in sub_check_results:
# summary level will only show header and non-success results
# detail level will show all results with evaluation status
# verbose level will show all results
if (
detail_level == ResourceOutputDetailLevel.summary.value
and result.eval_status == CheckTaskStatus.error.value
):
text = result.display.renderable.strip("- ")
errors_displays.append(text)
elif (detail_level == ResourceOutputDetailLevel.detail.value and result.eval_status is not None) or (
detail_level == ResourceOutputDetailLevel.verbose.value
):
check_manager.add_display(
target_name=target_name,
namespace=namespace,
display=result.display,
)
# for errors displayed in summary level, align padding with parent
for error_display in errors_displays:
check_manager.add_display(
target_name=target_name,
namespace=namespace,
display=Padding(error_display, (0, 0, 0, parent_padding + 4)),
)
def _check_authentication_method(
method: dict,
check_manager: CheckManager,
target_authentications: str,
namespace: str,
sub_check_results: List[CheckResult],
resource_name: str,
detail_level: int = ResourceOutputDetailLevel.summary.value,
) -> None:
conditions = []
method_type = method.get("method")
method_eval_status = CheckTaskStatus.success.value
method_eval_value = {"method": method}
if method_type.lower() == "custom":
_evaluate_custom_authentication_method(
conditions=conditions,
method=method,
method_type=method_type,
check_manager=check_manager,
target_authentications=target_authentications,
namespace=namespace,
sub_check_results=sub_check_results,
method_eval_status=method_eval_status,
)
elif method_type.lower() == "x509":
conditions.append("spec.authenticationMethods[*].x509Settings")
setting = method.get("x509Settings", {})
if not setting:
method_display = (
f"- x509 Method: {{{colorize_string(method_type)}}} {colorize_string(color='red', value='not found')}."
)
method_eval_status = CheckTaskStatus.error.value
else:
method_display = f"- x509 method: {{{colorize_string(method_type)}}} {colorize_string(color='green', value='detected')}."
sub_check_results.append(
CheckResult(
display=Padding(method_display, (0, 0, 0, 16)),
eval_status=method_eval_status,
)
)
# authorizationAttributes
auth_attrs = setting.get("authorizationAttributes", {}).get("additionalProperties")
if auth_attrs:
conditions.append("spec.authenticationMethods[*].x509Settings.authorizationAttributes")
attributes_additional_properties = setting.get("authorizationAttributes", {}).get(
"additionalProperties", {}
)
# attributes
attributes = attributes_additional_properties.get("attributes")
if not attributes:
attributes_display = f"Authorization Attributes {colorize_string(color='red', value='not found')}."
method_eval_status = CheckTaskStatus.error.value
else:
attributes_display = f"Authorization Attributes: {colorize_string(attributes)} {colorize_string(color='green', value='detected')}."
sub_check_results.append(
CheckResult(
display=Padding(attributes_display, (0, 0, 0, 20)),
eval_status=CheckTaskStatus.success.value,
)
)
# subject
subject = attributes_additional_properties.get("subject")
if not subject:
subject_display = f"Subject {colorize_string(color='red', value='not found')}."
method_eval_status = CheckTaskStatus.error.value
else:
subject_display = (
f"Subject: {colorize_string(subject)} {colorize_string(color='green', value='detected')}."
)
sub_check_results.append(
CheckResult(
display=Padding(subject_display, (0, 0, 0, 20)),
eval_status=CheckTaskStatus.success.value,
)
)
# trustedClientCaCert
trusted_client_ca_cert = setting.get("trustedClientCaCert")
if trusted_client_ca_cert:
trusted_client_ca_cert_value = {
"spec.authenticationMethods[*].x509Settings.trustedClientCaCert": trusted_client_ca_cert
}
is_valid = validate_runtime_resource_ref(
namespace=namespace,
name=trusted_client_ca_cert,
ref_type=ValidationResourceType.configmap,
)
if is_valid:
configmap_validate_text = f"[green]Valid[/green] {ValidationResourceType.configmap.value} reference {{[green]{trusted_client_ca_cert}[/green]}}."
else:
configmap_validate_text = f"[red]Invalid[/red] {ValidationResourceType.configmap.value} reference {{[red]{trusted_client_ca_cert}[/red]}}."
trusted_client_ca_cert_display = "Trusted Client CA Cert: {}"
trusted_client_ca_cert_status = CheckTaskStatus.success.value if is_valid else CheckTaskStatus.error.value
sub_check_results.append(
CheckResult(
display=Padding(trusted_client_ca_cert_display.format(configmap_validate_text), (0, 0, 0, 20)),
eval_status=trusted_client_ca_cert_status,
)
)
conditions.append("valid(spec.authenticationMethods[*].x509Settings.trustedClientCaCert)")
check_manager.add_target_eval(
target_name=target_authentications,
namespace=namespace,
status=trusted_client_ca_cert_status,
value=trusted_client_ca_cert_value,
resource_name=f"configmap/{trusted_client_ca_cert}",
)
elif method_type.lower() == "serviceaccounttoken":
conditions.append("spec.authenticationMethods[*].serviceAccountTokenSettings")
setting = method.get("serviceAccountTokenSettings", {})
if not setting:
method_display = f"- Service Account Token Method: {{{colorize_string(method_type)}}} {colorize_string(color='red', value='not found')}."
method_eval_status = CheckTaskStatus.error.value
else:
method_display = f"- Service Account Token Method: {{{colorize_string(method_type)}}} {colorize_string(color='green', value='detected')}."
sub_check_results.append(
CheckResult(
display=Padding(method_display, (0, 0, 0, 16)),
eval_status=method_eval_status,
)
)
# audiences
audiences = setting.get("audiences")
conditions.append("spec.authenticationMethods[*].serviceAccountTokenSettings.audiences")
if not audiences:
audiences_display = f"Audiences {colorize_string(color='red', value='not found')}."
method_eval_status = CheckTaskStatus.error.value
else:
audiences_display = (
f"Audiences: {colorize_string(str(audiences))} {colorize_string(color='green', value='detected')}."
)
if detail_level != ResourceOutputDetailLevel.summary.value:
sub_check_results.append(
CheckResult(
display=Padding(audiences_display, (0, 0, 0, 20)),
eval_status=CheckTaskStatus.success.value,
)
)
else:
conditions.append("spec.authenticationMethods[*].method")
method_display = (
f"- Unknown method type: {colorize_string(color='red', value=method_type)}."
if method_type
else f"- Method {colorize_string(color='red', value='not found')}."
)
method_eval_status = CheckTaskStatus.error.value
sub_check_results.append(
CheckResult(
display=Padding(method_display, (0, 0, 0, 16)),
eval_status=method_eval_status,
)
)
# remove duplicate conditions
check_conditions = check_manager.targets.get(target_authentications, {}).get(namespace, {}).get("conditions", [])
conditions = list(set(conditions + check_conditions))
check_manager.set_target_conditions(
target_name=target_authentications,
namespace=namespace,
conditions=conditions,
)
check_manager.add_target_eval(
target_name=target_authentications,
namespace=namespace,
status=method_eval_status,
value=method_eval_value,
resource_name=resource_name,
)
def _evaluate_custom_authentication_method(
conditions: List[str],
method: dict,
method_type: str,
check_manager: CheckManager,
target_authentications: str,
namespace: str,
sub_check_results: List[CheckResult],
method_eval_status: str,
):
conditions.append("spec.authenticationMethods[*].customSettings")
setting = method.get("customSettings", {})
if not setting:
method_color = "red"
method_eval_status = CheckTaskStatus.error.value
method_status = "not found"
else:
method_color = "green"
method_status = "detected"
method_display = (
f"- Custom Method: {{{colorize_string(method_type)}}} "
f"{colorize_string(color=method_color, value=method_status)}."
)
sub_check_results.append(
CheckResult(
display=Padding(method_display, (0, 0, 0, 16)),
eval_status=method_eval_status,
)
)
# endpoint
endpoint = setting.get("endpoint", "")
conditions.append("spec.authenticationMethods[*].customSettings.endpoint")
if not endpoint:
endpoint_display = f"Endpoint {colorize_string(color='red', value='not found')}."
method_eval_status = CheckTaskStatus.error.value
elif not endpoint.lower().startswith("https://"):
endpoint_display = f"Endpoint: {colorize_string(color='red', value='Invalid')} endpoint format {{{colorize_string(endpoint)}}}."
method_eval_status = CheckTaskStatus.error.value
else:
endpoint_display = (
f"Endpoint: {{{colorize_string(endpoint)}}} {colorize_string(color='green', value='detected')}."
)
sub_check_results.append(
CheckResult(
display=Padding(endpoint_display, (0, 0, 0, 20)),
eval_status=method_eval_status,
)
)
# auth
auth = setting.get("auth")
if auth:
# check x509
secret_ref = auth.get("x509", {}).get("secretRef")
secret_ref_value = {"spec.authenticationMethods[*].customSettings.auth.x509.secretRef": secret_ref}
is_valid = validate_runtime_resource_ref(
namespace=namespace,
name=secret_ref,
ref_type=ValidationResourceType.secret,
)
if is_valid:
secret_validate_text = f"[green]Valid[/green] {ValidationResourceType.secret.value} reference {{[green]{secret_ref}[/green]}}."
else:
secret_validate_text = (
f"[red]Invalid[/red] {ValidationResourceType.secret.value} reference {{[red]{secret_ref}[/red]}}."
)
secret_ref_display = "X.509 Client Certificate Secret reference: {}"
secret_ref_status = CheckTaskStatus.success.value if is_valid else CheckTaskStatus.error.value
sub_check_results.append(
CheckResult(
display=Padding(secret_ref_display.format(secret_validate_text), (0, 0, 0, 20)),
eval_status=secret_ref_status,
)
)
conditions.append("valid(spec.authenticationMethods[*].customSettings.auth.x509.secretRef)")
# add eval separately for secret ref
check_manager.add_target_eval(
target_name=target_authentications,
namespace=namespace,
status=secret_ref_status,
value=secret_ref_value,
resource_name=f"secret/{secret_ref}",
)
# caCertConfigMap
ca_cert_config_map = setting.get("caCertConfigMap")
if ca_cert_config_map:
ca_cert_config_map_value = {"spec.authenticationMethods[*].customSettings.caCertConfigMap": ca_cert_config_map}
is_valid = validate_runtime_resource_ref(
namespace=namespace,
name=ca_cert_config_map,
ref_type=ValidationResourceType.configmap,
)
if is_valid:
configmap_validate_text = f"[green]Valid[/green] {ValidationResourceType.configmap.value} reference {{[green]{ca_cert_config_map}[/green]}}."
else:
configmap_validate_text = f"[red]Invalid[/red] {ValidationResourceType.configmap.value} reference {{[red]{ca_cert_config_map}[/red]}}."
ca_cert_config_map_display = "CA Certificate Config Map: {}"
ca_cert_config_map_status = CheckTaskStatus.success.value if is_valid else CheckTaskStatus.error.value
sub_check_results.append(
CheckResult(
display=Padding(ca_cert_config_map_display.format(configmap_validate_text), (0, 0, 0, 20)),
eval_status=ca_cert_config_map_status,
)
)
conditions.append("valid(spec.authenticationMethods[*].customSettings.caCertConfigMap)")
check_manager.add_target_eval(
target_name=target_authentications,
namespace=namespace,
status=ca_cert_config_map_status,
value=ca_cert_config_map_value,
resource_name=f"configmap/{ca_cert_config_map}",
)
# headers
headers = setting.get("headers", {}).get("additionalProperties")
if headers:
sub_check_results.append(
CheckResult(
display=Padding(f"HTTP Headers: {colorize_string(headers)}", (0, 0, 0, 20)),
eval_status=None,
),
)
def _evaluate_broker_cardinality(
broker_cardinality: dict,
check_manager: CheckManager,
target_brokers: str,
namespace: str,
padding: int,
detail_level: int = ResourceOutputDetailLevel.summary.value,
) -> str:
check_condition_colored_text = colorize_string(">=1")
backend_cardinality_desc = f"- Expecting backend partitions {check_condition_colored_text}. "
backend_redundancy_desc = f"- Expecting backend redundancy factor {check_condition_colored_text}. "
backend_workers_desc = f"- Expecting backend workers {check_condition_colored_text}. "
frontend_cardinality_desc = f"- Expecting frontend replicas {check_condition_colored_text}. "
backend_chain = broker_cardinality.get("backendChain", {})
backend_partition_count: Optional[int] = backend_chain.get("partitions")
backend_redundancy: Optional[int] = backend_chain.get("redundancyFactor")
backend_workers: Optional[int] = backend_chain.get("workers")
frontend_replicas: Optional[int] = broker_cardinality.get("frontend", {}).get("replicas")
broker_eval_status = CheckTaskStatus.success.value
if backend_partition_count and backend_partition_count >= 1:
backend_color = "green"
else:
backend_color = "red"
broker_eval_status = CheckTaskStatus.error.value
backend_chain_count_colored = f"{colorize_string(color=backend_color, value=f'Actual {backend_partition_count}')}."
if backend_redundancy and backend_redundancy >= 1:
backend_redundancy_color = "green"
else:
backend_redundancy_color = "red"
broker_eval_status = CheckTaskStatus.error.value
backend_replicas_colored = (
f"{colorize_string(color=backend_redundancy_color, value=f'Actual {backend_redundancy}')}."
)
if backend_workers and backend_workers >= 1:
backend_workers_color = "green"
else:
backend_workers_color = "red"
broker_eval_status = CheckTaskStatus.error.value
backend_workers_colored = f"{colorize_string(color=backend_workers_color, value=f'Actual {backend_workers}')}."
if frontend_replicas and frontend_replicas >= 1:
frontend_replicas_color = "green"
else:
frontend_replicas_color = "red"
broker_eval_status = CheckTaskStatus.error.value
frontend_replicas_colored = (
f"{colorize_string(color=frontend_replicas_color, value=f'Actual {frontend_replicas}')}."
)
# show cardinality display on non-summary detail_levels
if detail_level != ResourceOutputDetailLevel.summary.value:
check_manager.add_display(
target_name=target_brokers,
namespace=namespace,
display=Padding("\nCardinality", (0, 0, 0, 12)),
)
for display in [
backend_cardinality_desc + backend_chain_count_colored,
backend_redundancy_desc + backend_replicas_colored,
backend_workers_desc + backend_workers_colored,
frontend_cardinality_desc + frontend_replicas_colored,
]:
check_manager.add_display(
target_name=target_brokers,
namespace=namespace,
display=Padding(display, (0, 0, 0, padding + 4)),
)
return broker_eval_status