azext_edge/edge/providers/support/mq.py (102 lines of code) (raw):

# coding=utf-8 # ---------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License file in the project root for license information. # ---------------------------------------------------------------------------------------------- from functools import partial from typing import Iterable, Optional from zipfile import ZipInfo from knack.log import get_logger from ..edge_api import MQ_ACTIVE_API, EdgeResourceApi from ..stats import get_traces from .base import ( DAY_IN_SECONDS, assemble_crd_work, get_mq_namespaces, process_config_maps, process_daemonsets, process_jobs, process_replicasets, process_services, process_statefulset, process_v1_pods, ) from .common import NAME_LABEL_FORMAT logger = get_logger(__name__) MQ_NAME_LABEL = NAME_LABEL_FORMAT.format(label=MQ_ACTIVE_API.label) MQ_DIRECTORY_PATH = MQ_ACTIVE_API.moniker def fetch_diagnostic_traces(): namespaces = get_mq_namespaces() result = [] for namespace in namespaces: try: traces = get_traces(namespace=namespace, trace_ids=["!support_bundle!"]) if traces: for trace in traces: zinfo = ZipInfo( filename=f"{namespace}/{MQ_DIRECTORY_PATH}/traces/{trace[0].filename}", date_time=trace[0].date_time, ) # Fixed in Py 3.9 https://github.com/python/cpython/issues/70373 zinfo.file_size = 0 zinfo.compress_size = 0 result.append( { "data": trace[1], "zinfo": zinfo, } ) except Exception: logger.debug(f"Unable to process diagnostics pod traces against namespace {namespace}.") return result def fetch_statefulsets(): return process_statefulset( directory_path=MQ_DIRECTORY_PATH, label_selector=MQ_NAME_LABEL, ) def fetch_daemonsets(): return process_daemonsets( directory_path=MQ_DIRECTORY_PATH, label_selector=MQ_NAME_LABEL, ) def fetch_services(): return process_services( directory_path=MQ_DIRECTORY_PATH, label_selector=MQ_NAME_LABEL, ) def fetch_replicasets(): return process_replicasets( directory_path=MQ_DIRECTORY_PATH, label_selector=MQ_NAME_LABEL, ) def fetch_jobs(): return process_jobs( directory_path=MQ_DIRECTORY_PATH, label_selector=MQ_NAME_LABEL, ) def fetch_configmaps(): return process_config_maps( directory_path=MQ_DIRECTORY_PATH, label_selector=MQ_NAME_LABEL, ) def fetch_pods(since_seconds: int = DAY_IN_SECONDS): return process_v1_pods( directory_path=MQ_DIRECTORY_PATH, label_selector=MQ_NAME_LABEL, since_seconds=since_seconds, ) support_runtime_elements = { "statefulsets": fetch_statefulsets, "configmaps": fetch_configmaps, "jobs": fetch_jobs, "replicasets": fetch_replicasets, "services": fetch_services, "daemonsets": fetch_daemonsets, } def prepare_bundle( log_age_seconds: int = DAY_IN_SECONDS, apis: Optional[Iterable[EdgeResourceApi]] = None, include_mq_traces: Optional[bool] = None, ) -> dict: mq_to_run = {} # when apis not found, still try to fetch other resources if apis: mq_to_run.update(assemble_crd_work(apis)) support_runtime_elements["pods"] = partial(fetch_pods, since_seconds=log_age_seconds) if include_mq_traces: support_runtime_elements["traces"] = fetch_diagnostic_traces mq_to_run.update(support_runtime_elements) return mq_to_run