azext_edge/edge/providers/check/base/resource.py (412 lines of code) (raw):
# coding=utf-8
# ----------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License file in the project root for license information.
# ----------------------------------------------------------------------------------------------
from enum import Enum
from itertools import groupby
from knack.log import get_logger
from kubernetes.client.models import (
V1APIResource,
V1APIResourceList,
)
from rich.padding import Padding
from typing import Any, Dict, List, Optional, Tuple, Union
from azext_edge.edge.providers.k8s.config_map import get_config_map
from .check_manager import CheckManager
from .display import process_value_color
from ..common import COLOR_STR_FORMAT, PADDING_SIZE, ResourceOutputDetailLevel, ValidationResourceType
from ...base import get_cluster_custom_api, get_namespaced_secret
from ...edge_api import EdgeResourceApi
from ....common import CheckTaskStatus, ResourceState
# TODO: refactor
logger = get_logger(__name__)
def decorate_resource_status(status: str) -> str:
from ....common import ResourceState
return COLOR_STR_FORMAT.format(color=ResourceState.map_to_color(status), value=status)
def enumerate_ops_service_resources(
api_info: EdgeResourceApi,
check_name: str,
check_desc: str,
as_list: bool = False,
excluded_resources: Optional[List[str]] = None,
) -> Tuple[dict, dict]:
resource_kind_map = {}
target_api = api_info.as_str()
check_manager = CheckManager(check_name=check_name, check_desc=check_desc)
check_manager.add_target(target_name=target_api)
api_resources: V1APIResourceList = get_cluster_custom_api(group=api_info.group, version=api_info.version)
if not api_resources:
check_manager.add_target_eval(target_name=target_api, status=CheckTaskStatus.error.value)
missing_api_text = f"[bright_blue]{target_api}[/bright_blue] API resources [red]not[/red] detected."
check_manager.add_display(target_name=target_api, display=Padding(missing_api_text, (0, 0, 0, 8)))
return check_manager.as_dict(as_list), resource_kind_map
api_header_display = Padding(f"[bright_blue]{target_api}[/bright_blue] API resources", (0, 0, 0, 8))
check_manager.add_display(target_name=target_api, display=api_header_display)
for resource in api_resources.resources:
r: V1APIResource = resource
if excluded_resources and r.name in excluded_resources:
continue
if r.kind not in resource_kind_map:
resource_kind_map[r.kind] = True
check_manager.add_display(
target_name=target_api,
display=Padding(f"[cyan]{r.kind}[/cyan]", (0, 0, 0, 12)),
)
check_manager.add_target_eval(
target_name=target_api,
status=CheckTaskStatus.success.value,
value=list(resource_kind_map.keys()),
)
return check_manager.as_dict(as_list), resource_kind_map
def filter_resources_by_name(
resources: List[dict],
resource_name: str,
) -> List[dict]:
from fnmatch import fnmatch
if not resource_name:
return resources
resource_name = resource_name.lower()
resources = [
resource
for resource in resources
if fnmatch(get_resource_metadata_property(resource, prop_name="name"), resource_name)
]
return resources
def filter_resources_by_namespace(resources: List[dict], namespace: str) -> List[dict]:
return [resource for resource in resources if _get_namespace(resource) == namespace]
def generate_target_resource_name(api_info: EdgeResourceApi, resource_kind: str) -> str:
resource_plural = api_info._kinds[resource_kind] if api_info._kinds else f"{resource_kind}s"
return f"{resource_plural}.{api_info.group}"
def _get_namespace(resource):
return get_resource_metadata_property(resource, prop_name="namespace")
def get_resources_by_name(
api_info: EdgeResourceApi,
kind: Union[str, Enum],
resource_name: str,
namespace: str = None,
) -> List[dict]:
resources: list = api_info.get_resources(kind=kind, namespace=namespace).get("items", [])
resources = filter_resources_by_name(resources, resource_name)
return resources
def get_resources_grouped_by_namespace(resources: List[dict]):
resources.sort(key=_get_namespace)
return groupby(resources, key=_get_namespace)
# get either name or namespace from resource that might be a object or a dict
def get_resource_metadata_property(resource: Union[dict, Any], prop_name: str) -> Union[str, None]:
if isinstance(resource, dict):
return resource.get("metadata", {}).get(prop_name)
return getattr(resource.metadata, prop_name, None) if hasattr(resource, "metadata") else None
def process_dict_resource(
check_manager: CheckManager,
target_name: str,
resource: dict,
namespace: str,
padding: int,
prop_name: Optional[str] = None,
) -> None:
if prop_name:
check_manager.add_display(
target_name=target_name, namespace=namespace, display=Padding(f"{prop_name}:", (0, 0, 0, padding))
)
padding += PADDING_SIZE
for key, value in resource.items():
if isinstance(value, dict):
check_manager.add_display(
target_name=target_name, namespace=namespace, display=Padding(f"{key}:", (0, 0, 0, padding))
)
process_dict_resource(
check_manager=check_manager,
target_name=target_name,
resource=value,
namespace=namespace,
padding=padding + PADDING_SIZE,
)
elif isinstance(value, list):
if len(value) == 0:
continue
display_text = f"{key}:"
check_manager.add_display(
target_name=target_name,
namespace=namespace,
display=Padding(display_text, (0, 0, 0, padding)),
)
process_list_resource(
check_manager=check_manager,
target_name=target_name,
resource=value,
namespace=namespace,
padding=padding + PADDING_SIZE,
)
else:
display_text = f"{key}: "
value_padding = padding
if isinstance(value, str) and len(value) > 50:
check_manager.add_display(
target_name=target_name,
namespace=namespace,
display=Padding(display_text, (0, 0, 0, padding)),
)
value_padding += PADDING_SIZE
display_text = ""
display_text += process_value_color(
check_manager=check_manager, target_name=target_name, key=key, value=value
)
check_manager.add_display(
target_name=target_name,
namespace=namespace,
display=Padding(display_text, (0, 0, 0, value_padding)),
)
def process_list_resource(
check_manager: CheckManager, target_name: str, resource: List[dict], namespace: str, padding: int
) -> None:
for item in resource:
name = ""
if isinstance(item, dict):
name = item.pop("name", None)
# when name property exists, use name as header; if not, use property type and index as header
if name:
check_manager.add_display(
target_name=target_name,
namespace=namespace,
display=Padding(f"- name: [cyan]{name}[/cyan]", (0, 0, 0, padding)),
)
else:
check_manager.add_display(
target_name=target_name,
namespace=namespace,
display=Padding(f"- item {resource.index(item) + 1}", (0, 0, 0, padding)),
)
if isinstance(item, dict):
process_dict_resource(
check_manager=check_manager,
target_name=target_name,
resource=item,
namespace=namespace,
padding=padding + 2,
)
elif isinstance(item, str):
check_manager.add_display(
target_name=target_name,
namespace=namespace,
display=Padding(f"[cyan]{item}[/cyan]", (0, 0, 0, padding + 2)),
)
def process_resource_properties(
check_manager: CheckManager,
detail_level: int,
target_name: str,
prop_value: Dict[str, Any],
properties: Dict[str, Any],
namespace: str,
padding: tuple,
) -> None:
if not prop_value:
return
for prop, display_name, verbose_only in properties:
keys = prop.split(".")
value = prop_value
for key in keys:
value = value.get(key)
if value is None:
break
if prop == "descriptor":
value = value if detail_level == ResourceOutputDetailLevel.verbose.value else value[:10] + "..."
if verbose_only and detail_level != ResourceOutputDetailLevel.verbose.value:
continue
process_resource_property_by_type(
check_manager=check_manager,
target_name=target_name,
properties=value,
display_name=display_name,
namespace=namespace,
padding=padding,
)
def process_resource_property_by_type(
check_manager: CheckManager,
target_name: str,
properties: Any,
display_name: str,
namespace: str,
padding: tuple,
) -> None:
padding_left = padding[3]
if isinstance(properties, list):
if len(properties) == 0:
return
display_text = f"{display_name}:"
check_manager.add_display(target_name=target_name, namespace=namespace, display=Padding(display_text, padding))
for property in properties:
display_text = f"- {display_name} {properties.index(property) + 1}"
check_manager.add_display(
target_name=target_name,
namespace=namespace,
display=Padding(display_text, (0, 0, 0, padding_left + 2)),
)
for prop, value in property.items():
display_text = f"{prop}: [cyan]{value}[/cyan]"
check_manager.add_display(
target_name=target_name,
namespace=namespace,
display=Padding(display_text, (0, 0, 0, padding_left + PADDING_SIZE)),
)
elif isinstance(properties, str) or isinstance(properties, bool) or isinstance(properties, int):
properties = str(properties) if properties else "undefined"
if len(properties) < 50:
display_text = f"{display_name}: [cyan]{properties}[/cyan]"
else:
check_manager.add_display(
target_name=target_name, namespace=namespace, display=Padding(f"{display_name}:", padding)
)
display_text = f"[cyan]{properties}[/cyan]"
padding = (0, 0, 0, padding_left + 4)
check_manager.add_display(target_name=target_name, namespace=namespace, display=Padding(display_text, padding))
elif isinstance(properties, dict):
display_text = f"{display_name}:"
check_manager.add_display(target_name=target_name, namespace=namespace, display=Padding(display_text, padding))
for prop, value in properties.items():
display_text = f"{prop}: [cyan]{value}[/cyan]"
check_manager.add_display(
target_name=target_name,
namespace=namespace,
display=Padding(display_text, (0, 0, 0, padding_left + 2)),
)
def validate_one_of_conditions(
conditions: List[tuple],
check_manager: CheckManager,
eval_value: dict,
namespace: str,
target_name: str,
padding: int,
resource_name: Optional[str] = None,
) -> None:
if len(conditions) == 1:
return
non_empty_conditions_count = len([condition for condition in conditions if condition[1]])
eval_status = CheckTaskStatus.success.value
conditions_names = ", ".join([f"'{condition[0]}'" for condition in conditions])
if non_empty_conditions_count == 0:
check_manager.add_display(
target_name=target_name,
namespace=namespace,
display=Padding(
f"One of {conditions_names} should be specified",
(0, 0, 0, padding),
),
)
eval_status = CheckTaskStatus.error.value
elif non_empty_conditions_count > 1:
check_manager.add_display(
target_name=target_name,
namespace=namespace,
display=Padding(
f"Only one of {conditions_names} should be specified",
(0, 0, 0, padding),
),
)
eval_status = CheckTaskStatus.error.value
one_of_condition = f"oneOf({conditions_names})"
check_manager.add_target_conditions(target_name=target_name, namespace=namespace, conditions=[one_of_condition])
check_manager.add_target_eval(
target_name=target_name,
namespace=namespace,
status=eval_status,
value=eval_value,
resource_name=resource_name,
)
def combine_statuses(status_list: List[str]):
# lower case status list
status_list = [status.lower() for status in status_list]
final_status = "success"
for status in status_list:
if final_status == "success" and status not in ["running", "succeeded", "ok"]:
final_status = status
elif final_status in ["warning", "skipped", "warn", "starting", "recovering"] and status == "error":
final_status = status
return final_status
def calculate_status(resource_state: str) -> str:
return ResourceState.map_to_status(resource_state).value
def process_custom_resource_status(
check_manager: CheckManager,
status: dict,
target_name: str,
namespace: str,
resource_name: str,
padding: int,
detail_level: int = ResourceOutputDetailLevel.summary.value,
) -> None:
runtime_status = status.get("runtimeStatus", {})
provisioning_status = status.get("provisioningStatus", {})
check_manager.add_target_conditions(
target_name=target_name,
conditions=["status"],
namespace=namespace,
)
if not runtime_status and not provisioning_status:
# if no status for both runtime and provisioning, set status to error
check_manager.add_target_eval(
target_name=target_name,
namespace=namespace,
status=CheckTaskStatus.error.value,
value={"status": status},
resource_name=resource_name,
)
check_manager.add_display(
target_name=target_name,
namespace=namespace,
display=Padding("Status [red]not found[/red].", (0, 0, 0, padding)),
)
return
status_eval_value = {"status": status}
status_list = []
if runtime_status:
runtime_status_state = runtime_status.get("status")
status_list.append(calculate_status(runtime_status_state))
if provisioning_status:
provisioning_status_state = provisioning_status.get("status")
status_list.append(calculate_status(provisioning_status_state))
# combine runtime and provisioning status
status_eval_status = combine_statuses(status_list)
check_manager.add_target_eval(
target_name=target_name,
namespace=namespace,
status=status_eval_status,
value=status_eval_value,
resource_name=resource_name,
)
if detail_level == ResourceOutputDetailLevel.summary.value:
check_manager.add_display(
target_name=target_name,
namespace=namespace,
display=Padding(
f"Status {{{decorate_resource_status(status_eval_status)}}}.",
(0, 0, 0, padding),
),
)
else:
check_manager.add_display(
target_name=target_name,
namespace=namespace,
display=Padding("Status:", (0, 0, 0, padding)),
)
for prop_name, prop_value in {
"Provisioning Status": provisioning_status,
"Runtime Status": runtime_status,
}.items():
if prop_value:
status_text = f"{prop_name} {{{decorate_resource_status(prop_value.get('status'))}}}."
if detail_level == ResourceOutputDetailLevel.verbose.value:
status_description = prop_value.get("description") or prop_value.get("output", {}).get("message")
if status_description:
status_text = status_text.replace(".", f", [cyan]{status_description}[/cyan].")
check_manager.add_display(
target_name=target_name,
namespace=namespace,
display=Padding(status_text, (0, 0, 0, padding + 4)),
)
def validate_runtime_resource_ref(name: str, namespace: str, ref_type: ValidationResourceType) -> bool:
ref_obj = None
if ref_type == ValidationResourceType.secret:
ref_obj = get_namespaced_secret(secret_name=name, namespace=namespace)
elif ref_type == ValidationResourceType.configmap:
ref_obj = get_config_map(name=name, namespace=namespace)
else:
raise ValueError(f"Unsupported ref type: {ref_type}")
return bool(ref_obj)
def get_valid_resource_names(
api: EdgeResourceApi, kind: Union[Enum, str], namespace: Optional[str] = None
) -> List[str]:
custom_objects = api.get_resources(kind=kind, namespace=namespace)
if custom_objects:
objects: List[dict] = custom_objects.get("items", [])
if objects:
return [get_resource_metadata_property(object, prop_name="name") for object in objects]
return []