azext_edge/edge/providers/support_bundle.py (208 lines of code) (raw):

# coding=utf-8 # ---------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License file in the project root for license information. # ---------------------------------------------------------------------------------------------- from typing import List, Optional from zipfile import ZipFile, ZIP_DEFLATED import yaml from knack.log import get_logger from rich.console import Console, NewLine from ..common import OpsServiceType from ..providers.edge_api import ( CERTMANAGER_API_V1, CLUSTER_CONFIG_API_V1, CONTAINERSTORAGE_API_V1, MQTT_BROKER_API_V1, DEVICEREGISTRY_API_V1, DATAFLOW_API_V1, META_API_V1, ARCCONTAINERSTORAGE_API_V1, SECRETSYNC_API_V1, SECRETSTORE_API_V1, TRUSTMANAGER_API_V1, AZUREMONITOR_API_V1, EdgeApiManager, ) logger = get_logger(__name__) console = Console() COMPAT_CERTMANAGER_APIS = EdgeApiManager(resource_apis=[CERTMANAGER_API_V1, TRUSTMANAGER_API_V1]) COMPAT_CLUSTER_CONFIG_APIS = EdgeApiManager(resource_apis=[CLUSTER_CONFIG_API_V1]) COMPAT_MQTT_BROKER_APIS = EdgeApiManager(resource_apis=[MQTT_BROKER_API_V1]) COMPAT_DEVICEREGISTRY_APIS = EdgeApiManager(resource_apis=[DEVICEREGISTRY_API_V1]) COMPAT_DATAFLOW_APIS = EdgeApiManager(resource_apis=[DATAFLOW_API_V1]) COMPAT_META_APIS = EdgeApiManager(resource_apis=[META_API_V1]) COMPAT_ARCCONTAINERSTORAGE_APIS = EdgeApiManager(resource_apis=[ARCCONTAINERSTORAGE_API_V1, CONTAINERSTORAGE_API_V1]) COMPAT_SECRETSTORE_APIS = EdgeApiManager(resource_apis=[SECRETSYNC_API_V1, SECRETSTORE_API_V1]) COMPAT_AZUREMONITOR_APIS = EdgeApiManager(resource_apis=[AZUREMONITOR_API_V1]) def build_bundle( bundle_path: str, log_age_seconds: Optional[int] = None, ops_services: Optional[List[str]] = None, include_mq_traces: Optional[bool] = None, ): from rich.live import Live from rich.progress import Progress from rich.table import Table from .support.billing import prepare_bundle as prepare_billing_bundle from .support.mq import prepare_bundle as prepare_mq_bundle from .support.connectors import prepare_bundle as prepare_connector_bundle from .support.dataflow import prepare_bundle as prepare_dataflow_bundle from .support.deviceregistry import prepare_bundle as prepare_deviceregistry_bundle from .support.shared import prepare_bundle as prepare_shared_bundle from .support.akri import prepare_bundle as prepare_akri_bundle from .support.arcagents import prepare_bundle as prepare_arcagents_bundle from .support.meta import prepare_bundle as prepare_meta_bundle from .support.schemaregistry import prepare_bundle as prepare_schema_registry_bundle from .support.arccontainerstorage import prepare_bundle as prepare_arccontainerstorage_bundle from .support.secretstore import prepare_bundle as prepare_secretstore_bundle from .support.azuremonitor import prepare_bundle as prepare_azuremonitor_bundle from .support.certmanager import prepare_bundle as prepare_certmanager_bundle from .support.meso import prepare_bundle as prepare_meso_bundle def collect_default_works( pending_work: dict, log_age_seconds: Optional[int] = None, ): # arc agent resources pending_work["arcagents"] = prepare_arcagents_bundle(log_age_seconds) # Collect common resources if any AIO service is deployed with any service selected. pending_work["common"] = prepare_shared_bundle() # Collect meta resources if any AIO service is deployed with any service selected. deployed_meta_apis = COMPAT_META_APIS.get_deployed() pending_work["meta"] = prepare_meta_bundle(log_age_seconds, deployed_meta_apis) pending_work = {k: {} for k in OpsServiceType.list()} api_map = { OpsServiceType.mq.value: {"apis": COMPAT_MQTT_BROKER_APIS, "prepare_bundle": prepare_mq_bundle}, OpsServiceType.billing.value: { "apis": COMPAT_CLUSTER_CONFIG_APIS, "prepare_bundle": prepare_billing_bundle, }, OpsServiceType.connectors.value: { "apis": None, "prepare_bundle": prepare_connector_bundle, }, OpsServiceType.akri.value: {"apis": None, "prepare_bundle": prepare_akri_bundle}, OpsServiceType.deviceregistry.value: { "apis": COMPAT_DEVICEREGISTRY_APIS, "prepare_bundle": prepare_deviceregistry_bundle, }, OpsServiceType.dataflow.value: { "apis": COMPAT_DATAFLOW_APIS, "prepare_bundle": prepare_dataflow_bundle, }, OpsServiceType.schemaregistry.value: { "apis": None, "prepare_bundle": prepare_schema_registry_bundle, }, OpsServiceType.arccontainerstorage.value: { "apis": COMPAT_ARCCONTAINERSTORAGE_APIS, "prepare_bundle": prepare_arccontainerstorage_bundle, }, OpsServiceType.secretstore.value: { "apis": COMPAT_SECRETSTORE_APIS, "prepare_bundle": prepare_secretstore_bundle, }, OpsServiceType.azuremonitor.value: { "apis": COMPAT_AZUREMONITOR_APIS, "prepare_bundle": prepare_azuremonitor_bundle, }, OpsServiceType.certmanager.value: { "apis": COMPAT_CERTMANAGER_APIS, "prepare_bundle": prepare_certmanager_bundle, }, OpsServiceType.meso.value: { "apis": None, "prepare_bundle": prepare_meso_bundle, }, } if not ops_services: parsed_ops_services = OpsServiceType.list() else: # remove duplicates parsed_ops_services = list(set(ops_services)) for ops_service in parsed_ops_services: # assign key and value to service_moniker and api_info service_moniker = [k for k, _ in api_map.items() if k == ops_service][0] api_info = api_map.get(service_moniker) deployed_apis = api_info["apis"].get_deployed() if api_info["apis"] else None if not deployed_apis and service_moniker not in [ OpsServiceType.schemaregistry.value, OpsServiceType.akri.value, OpsServiceType.connectors.value, OpsServiceType.meso.value, ]: expected_api_version = api_info["apis"].as_str() logger.warning( f"The following API(s) were not detected {expected_api_version}. " f"CR capture for {service_moniker} will be skipped. " "Still attempting capture of runtime resources..." ) # still try fetching other resources even crds are not available due to api version mismatch bundle_method = api_info["prepare_bundle"] # Check if the function takes a second argument # TODO: Change to kwargs based pattern if service_moniker == OpsServiceType.deviceregistry.value: bundle = bundle_method(deployed_apis) elif service_moniker == OpsServiceType.mq.value: bundle = bundle_method(log_age_seconds, deployed_apis, include_mq_traces) elif service_moniker in [ OpsServiceType.schemaregistry.value, OpsServiceType.akri.value, OpsServiceType.connectors.value, OpsServiceType.meso.value, ]: bundle = bundle_method(log_age_seconds) else: bundle = bundle_method(log_age_seconds, deployed_apis) pending_work[service_moniker].update(bundle) collect_default_works(pending_work, log_age_seconds) total_work_count = 0 for service in pending_work: total_work_count = total_work_count + len(pending_work[service]) bundle = {service: {} for service, _ in pending_work.items()} grid = Table.grid(expand=False) with Live(grid, console=console, transient=True) as live: uber_progress = Progress() uber_task = uber_progress.add_task( "[green]Building support bundle", total=total_work_count, ) def visually_process(description: str, support_segment: dict, ops_service: str): namespace_task = uber_progress.add_task(f"[cyan]{description}", total=len(support_segment)) for element in support_segment: header = f"Fetching [medium_purple4]{element}[/medium_purple4] data..." grid = Table.grid(expand=False) grid.add_column() grid.add_row(NewLine(1)) grid.add_row(header) grid.add_row(NewLine(1)) grid.add_row(uber_progress) live.update(grid, refresh=True) try: # Produce as much support collateral as possible. bundle[ops_service][element] = support_segment[element]() except Exception as e: # @digimaun - bdb.BdbQuit? logger.debug(f"Unable to process {ops_service} {element}:\n{e}") finally: if not uber_progress.finished: uber_progress.update(namespace_task, advance=1) uber_progress.update(uber_task, advance=1) for service in pending_work: if pending_work[service]: visually_process( description=f"Processing {service}", support_segment=pending_work[service], ops_service=service, ) write_zip(file_path=bundle_path, bundle=bundle) return {"bundlePath": bundle_path} def write_zip(bundle: dict, file_path: str): with ZipFile(file=file_path, mode="w", compression=ZIP_DEFLATED) as myzip: todo: List[dict] = [] for ops_service in bundle: for element in bundle[ops_service]: if isinstance(bundle[ops_service][element], list): todo.extend(bundle[ops_service][element]) else: todo.append(bundle[ops_service][element]) added_path = {} for t in todo: if t: data = t.get("data") zinfo = t.get("zinfo") if data and zinfo not in added_path: if isinstance(data, dict): data = yaml.safe_dump(t["data"], indent=2) myzip.writestr(zinfo_or_arcname=zinfo, data=data) added_path[zinfo] = True def str_presenter(dumper, data): if "\n" in data: return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|") return dumper.represent_scalar("tag:yaml.org,2002:str", data) yaml.representer.SafeRepresenter.add_representer(str, str_presenter)