azext_edge/edge/providers/support/base.py (546 lines of code) (raw):

# coding=utf-8 # ---------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License file in the project root for license information. # ---------------------------------------------------------------------------------------------- from pathlib import PurePath from typing import List, Dict, Optional, Iterable, Tuple, TypeVar, Union from functools import partial from azext_edge.edge.common import BundleResourceKind, PodState from knack.log import get_logger from kubernetes.client.exceptions import ApiException from kubernetes.client.models import ( V1Container, V1ObjectMeta, V1PodSpec, V1PodList, V1ServiceList, V1DeploymentList, V1StatefulSetList, V1ReplicaSetList, V1DaemonSetList, V1PersistentVolumeClaimList, V1JobList, V1CronJobList, ) from ..edge_api import EdgeResourceApi from ..base import client, get_custom_objects from ...util import get_timestamp_now_utc logger = get_logger(__name__) generic = client.ApiClient() DAY_IN_SECONDS: int = 60 * 60 * 24 POD_STATUS_FAILED_EVICTED: str = "evicted" K8sRuntimeResources = TypeVar( "K8sRuntimeResources", V1ServiceList, V1PodList, V1DeploymentList, V1StatefulSetList, V1ReplicaSetList, V1DaemonSetList, V1PersistentVolumeClaimList, V1JobList, V1CronJobList, ) def process_crd( group: str, version: str, kind: str, plural: str, directory_path: str, file_prefix: Optional[str] = None, fallback_namespace: Optional[str] = None, ) -> List[dict]: result: dict = get_custom_objects( group=group, version=version, plural=plural, use_cache=False, ) if not file_prefix: file_prefix = kind processed = [] for r in result.get("items", []): # Try to get namespace from metadata, if not found, use fallback_namespace if provided try: namespace = r["metadata"]["namespace"] except KeyError: if fallback_namespace: namespace = fallback_namespace else: logger.debug("Namespace not found in CRD metadata and no fallback namespace provided.") name = r["metadata"]["name"] processed.append( { "data": r, "zinfo": f"{namespace}/{directory_path}/{file_prefix}.{version}.{name}.yaml", } ) return processed def process_v1_pods( directory_path: str, capture_previous_logs: bool = True, include_metrics: bool = False, since_seconds: int = DAY_IN_SECONDS, label_selector: Optional[str] = None, prefix_names: Optional[List[str]] = None, pod_prefix_for_init_container_logs: Optional[List[str]] = None, exclude_prefixes: Optional[List[str]] = None, namespace: Optional[str] = None, ) -> List[dict]: from kubernetes.client.models import V1Pod v1_api = client.CoreV1Api() custom_api = client.CustomObjectsApi() processed = [] if not prefix_names: prefix_names = [] if namespace: pods: V1PodList = v1_api.list_namespaced_pod(namespace=namespace, label_selector=label_selector) else: pods: V1PodList = v1_api.list_pod_for_all_namespaces(label_selector=label_selector) if exclude_prefixes: pods = exclude_resources_with_prefix(pods, exclude_prefixes) pod_logger_info = f"Detected {len(pods.items)} pods" if label_selector: pod_logger_info = f"{pod_logger_info} with label '{label_selector}'." logger.info(pod_logger_info) for pod in pods.items: p: V1Pod = pod pod_metadata: V1ObjectMeta = p.metadata pod_namespace: str = pod_metadata.namespace pod_name: str = pod_metadata.name if prefix_names: matched_prefix = [pod_name.startswith(prefix) for prefix in prefix_names] if not any(matched_prefix): continue # TODO: Workaround p.api_version = pods.api_version p.kind = "Pod" processed.append( { "data": generic.sanitize_for_serialization(obj=p), "zinfo": f"{pod_namespace}/{directory_path}/pod.{pod_name}.yaml", } ) pod_spec: V1PodSpec = p.spec pod_containers: List[V1Container] = pod_spec.containers if pod_prefix_for_init_container_logs: # check if pod name starts with any prefix in pod_prefix_for_init_container_logs if any(pod_name.startswith(prefix) for prefix in pod_prefix_for_init_container_logs): init_pod_containers: List[V1Container] = pod_spec.init_containers pod_containers.extend(init_pod_containers) # exclude evicted pods from log capture since they are not accessible pod_status = pod.status if ( pod_status and pod_status.phase == PodState.failed.value and str(pod_status.reason).lower() == POD_STATUS_FAILED_EVICTED ): logger.info(f"Pod {pod_name} in namespace {pod_namespace} is evicted. Skipping log capture.") else: processed.extend( _capture_pod_container_logs( directory_path=directory_path, pod_containers=pod_containers, pod_name=pod_name, pod_namespace=pod_namespace, v1_api=v1_api, since_seconds=since_seconds, capture_previous_logs=capture_previous_logs, ) ) if include_metrics: try: logger.debug(f"Fetching runtime metrics for {pod_name}") metric: dict = custom_api.get_namespaced_custom_object( "metrics.k8s.io", "v1", pod_namespace, "pods", pod_name ) if metric: processed.append( { "data": metric, "zinfo": f"{pod_namespace}/{directory_path}/pod.{pod_name}.metric.yaml", } ) except ApiException as e: logger.debug(e.body) return processed def process_deployments( directory_path: str, field_selector: Optional[str] = None, label_selector: Optional[str] = None, prefix_names: Optional[List[str]] = None, exclude_prefixes: Optional[List[str]] = None, namespace: Optional[str] = None, ) -> List[dict]: v1_apps = client.AppsV1Api() if namespace: deployments: V1DeploymentList = v1_apps.list_namespaced_deployment( namespace=namespace, label_selector=label_selector, field_selector=field_selector ) else: deployments: V1DeploymentList = v1_apps.list_deployment_for_all_namespaces( label_selector=label_selector, field_selector=field_selector ) return _process_kubernetes_resources( directory_path=directory_path, resources=deployments, prefix_names=prefix_names, kind=BundleResourceKind.deployment.value, exclude_prefixes=exclude_prefixes, ) def process_statefulset( directory_path: str, return_namespaces: bool = False, field_selector: Optional[str] = None, label_selector: Optional[str] = None, prefix_names: Optional[List[str]] = None, namespace: Optional[str] = None, ) -> Union[Tuple[List[dict], dict], List[dict]]: v1_apps = client.AppsV1Api() if namespace: statefulsets: V1StatefulSetList = v1_apps.list_namespaced_stateful_set( namespace=namespace, label_selector=label_selector, field_selector=field_selector ) else: statefulsets: V1StatefulSetList = v1_apps.list_stateful_set_for_all_namespaces( label_selector=label_selector, field_selector=field_selector ) namespace_pods_work = {} processed = _process_kubernetes_resources( directory_path=directory_path, resources=statefulsets, kind=BundleResourceKind.statefulset.value, prefix_names=prefix_names, ) for statefulset in statefulsets.items: statefulset_namespace: str = statefulset.metadata.namespace if statefulset_namespace not in namespace_pods_work: namespace_pods_work[statefulset_namespace] = True if return_namespaces: return processed, namespace_pods_work return processed def process_services( directory_path: str, field_selector: Optional[str] = None, label_selector: Optional[str] = None, prefix_names: Optional[List[str]] = None, exclude_prefixes: Optional[List[str]] = None, namespace: Optional[str] = None, ) -> List[dict]: v1_api = client.CoreV1Api() if namespace: services: V1ServiceList = v1_api.list_namespaced_service( namespace=namespace, label_selector=label_selector, field_selector=field_selector ) else: services: V1ServiceList = v1_api.list_service_for_all_namespaces( label_selector=label_selector, field_selector=field_selector ) return _process_kubernetes_resources( directory_path=directory_path, resources=services, prefix_names=prefix_names, kind=BundleResourceKind.service.value, exclude_prefixes=exclude_prefixes, ) def process_replicasets( directory_path: str, label_selector: Optional[str] = None, prefix_names: Optional[List[str]] = None, exclude_prefixes: Optional[List[str]] = None, namespace: Optional[str] = None, ) -> List[dict]: v1_apps = client.AppsV1Api() if namespace: replicasets: V1ReplicaSetList = v1_apps.list_namespaced_replica_set( namespace=namespace, label_selector=label_selector ) else: replicasets: V1ReplicaSetList = v1_apps.list_replica_set_for_all_namespaces(label_selector=label_selector) return _process_kubernetes_resources( directory_path=directory_path, resources=replicasets, prefix_names=prefix_names, kind=BundleResourceKind.replicaset.value, exclude_prefixes=exclude_prefixes, ) def process_daemonsets( directory_path: str, field_selector: Optional[str] = None, label_selector: Optional[str] = None, prefix_names: Optional[List[str]] = None, namespace: Optional[str] = None, ) -> List[dict]: v1_apps = client.AppsV1Api() if namespace: daemonsets: V1DaemonSetList = v1_apps.list_namespaced_daemon_set( namespace=namespace, label_selector=label_selector, field_selector=field_selector ) else: daemonsets: V1DaemonSetList = v1_apps.list_daemon_set_for_all_namespaces( label_selector=label_selector, field_selector=field_selector ) return _process_kubernetes_resources( directory_path=directory_path, resources=daemonsets, prefix_names=prefix_names, kind=BundleResourceKind.daemonset.value, ) def process_config_maps( directory_path: str, field_selector: Optional[str] = None, label_selector: Optional[str] = None, prefix_names: Optional[List[str]] = None, namespace: Optional[str] = None, ) -> List[dict]: v1_api = client.CoreV1Api() if namespace: config_maps = v1_api.list_namespaced_config_map( namespace=namespace, label_selector=label_selector, field_selector=field_selector ) else: config_maps = v1_api.list_config_map_for_all_namespaces( label_selector=label_selector, field_selector=field_selector ) return _process_kubernetes_resources( directory_path=directory_path, resources=config_maps, prefix_names=prefix_names, kind=BundleResourceKind.configmap.value, ) def process_cluster_roles( directory_path: str, label_selector: Optional[str] = None, field_selector: Optional[str] = None, ) -> Dict[str, Union[dict, str]]: from kubernetes.client.models import V1ClusterRoleList processed = [] rbac_api = client.RbacAuthorizationV1Api() cluster_roles: V1ClusterRoleList = rbac_api.list_cluster_role( label_selector=label_selector, field_selector=field_selector ) for role in cluster_roles.items: namespace = getattr(role.metadata, "annotations", {}).get("meta.helm.sh/release-namespace") name = role.metadata.name resource_type = _get_resource_type_prefix(BundleResourceKind.clusterrole.value) if namespace: processed.append( { "data": generic.sanitize_for_serialization(obj=role), "zinfo": f"{namespace}/{directory_path}/{resource_type}.{name}.yaml", } ) return processed def process_cluster_role_bindings( directory_path: str, label_selector: Optional[str] = None, field_selector: Optional[str] = None, ) -> Dict[str, Union[dict, str]]: from kubernetes.client.models import V1ClusterRoleBindingList processed = [] rbac_api = client.RbacAuthorizationV1Api() cluster_role_bindings: V1ClusterRoleBindingList = rbac_api.list_cluster_role_binding( label_selector=label_selector, field_selector=field_selector ) for binding in cluster_role_bindings.items: namespace = getattr(binding.metadata, "annotations", {}).get("meta.helm.sh/release-namespace") name = binding.metadata.name resource_type = _get_resource_type_prefix(BundleResourceKind.clusterrolebinding.value) if namespace: processed.append( { "data": generic.sanitize_for_serialization(obj=binding), "zinfo": f"{namespace}/{directory_path}/{resource_type}.{name}.yaml", } ) return processed def process_nodes() -> Dict[str, Union[dict, str]]: return { "data": generic.sanitize_for_serialization(obj=client.CoreV1Api().list_node()), "zinfo": "nodes.yaml", } def get_mq_namespaces() -> List[str]: from ..edge_api import MQ_ACTIVE_API, MqResourceKinds namespaces = [] cluster_brokers = MQ_ACTIVE_API.get_resources(MqResourceKinds.BROKER) if cluster_brokers and cluster_brokers["items"]: namespaces.extend([b["metadata"]["namespace"] for b in cluster_brokers["items"]]) return namespaces def process_events() -> List[dict]: event_content = [] core_v1_api = client.CoreV1Api() event_content.append( { "data": generic.sanitize_for_serialization(obj=core_v1_api.list_event_for_all_namespaces()), "zinfo": "events.yaml", } ) return event_content def process_storage_classes() -> List[dict]: storage_class_content = [] storage_v1_api = client.StorageV1Api() storage_class_content.append( { "data": generic.sanitize_for_serialization(obj=storage_v1_api.list_storage_class()), "zinfo": "storage-classes.yaml", } ) return storage_class_content def process_persistent_volume_claims( directory_path: str, field_selector: Optional[str] = None, label_selector: Optional[str] = None, prefix_names: Optional[List[str]] = None, namespace: Optional[str] = None, ) -> List[dict]: v1_api = client.CoreV1Api() if namespace: pvcs: V1PersistentVolumeClaimList = v1_api.list_namespaced_persistent_volume_claim( namespace=namespace, label_selector=label_selector, field_selector=field_selector ) else: pvcs: V1PersistentVolumeClaimList = v1_api.list_persistent_volume_claim_for_all_namespaces( label_selector=label_selector, field_selector=field_selector ) return _process_kubernetes_resources( directory_path=directory_path, resources=pvcs, prefix_names=prefix_names, kind=BundleResourceKind.pvc.value, ) def process_jobs( directory_path: str, field_selector: Optional[str] = None, label_selector: Optional[str] = None, prefix_names: Optional[List[str]] = None, exclude_prefixes: Optional[List[str]] = None, ) -> List[dict]: batch_v1_api = client.BatchV1Api() jobs: V1JobList = batch_v1_api.list_job_for_all_namespaces( label_selector=label_selector, field_selector=field_selector ) return _process_kubernetes_resources( directory_path=directory_path, resources=jobs, prefix_names=prefix_names, kind=BundleResourceKind.job.value, exclude_prefixes=exclude_prefixes, ) def process_cron_jobs( directory_path: str, field_selector: Optional[str] = None, label_selector: Optional[str] = None, prefix_names: Optional[List[str]] = None, ) -> List[dict]: batch_v1_api = client.BatchV1Api() cron_jobs: V1CronJobList = batch_v1_api.list_cron_job_for_all_namespaces( label_selector=label_selector, field_selector=field_selector ) return _process_kubernetes_resources( directory_path=directory_path, resources=cron_jobs, prefix_names=prefix_names, kind=BundleResourceKind.cronjob.value, ) def assemble_crd_work( apis: Iterable[EdgeResourceApi], file_prefix_map: Optional[Dict[str, str]] = None, directory_path: Optional[str] = None, fallback_namespace: Optional[str] = None, ) -> dict: if not file_prefix_map: file_prefix_map = {} result = {} for api in apis: path = directory_path or api.moniker for kind in api.kinds: file_prefix = file_prefix_map.get(kind) result[f"{api.moniker} {api.version} {kind}"] = partial( process_crd, group=api.group, version=api.version, kind=kind, plural=api._kinds[kind], # TODO: optimize directory_path=path, file_prefix=file_prefix, fallback_namespace=fallback_namespace, ) return result def get_bundle_path(bundle_dir: Optional[str] = None, system_name: str = "aio") -> PurePath: from ...util import normalize_dir bundle_dir_pure_path = normalize_dir(bundle_dir) bundle_pure_path = bundle_dir_pure_path.joinpath(default_bundle_name(system_name)) return bundle_pure_path def default_bundle_name(system_name: str) -> str: timestamp = get_timestamp_now_utc(format="%Y%m%dT%H%M%S") return f"support_bundle_{timestamp}_{system_name}.zip" def _capture_pod_container_logs( directory_path: str, pod_containers: List[V1Container], pod_name: str, pod_namespace: str, v1_api: client.CoreV1Api, capture_previous_logs: bool = True, since_seconds: int = DAY_IN_SECONDS, ) -> List[dict]: processed = [] capture_previous_log_runs = [False] if capture_previous_logs: capture_previous_log_runs.append(True) for container in pod_containers: for capture_previous in capture_previous_log_runs: try: logger_debug_previous = "previous run " if capture_previous else "" logger.debug(f"Reading {logger_debug_previous}log from pod {pod_name} container {container.name}") log: str = v1_api.read_namespaced_pod_log( name=pod_name, namespace=pod_namespace, since_seconds=since_seconds, container=container.name, previous=capture_previous, ) zinfo_previous_segment = "previous." if capture_previous else "" zinfo = f"{pod_namespace}/{directory_path}/pod.{pod_name}.{container.name}.{zinfo_previous_segment}log" processed.append( { "data": log, "zinfo": zinfo, } ) except ApiException as e: logger.debug(e.body) return processed def _process_kubernetes_resources( directory_path: str, resources: K8sRuntimeResources, kind: str, prefix_names: Optional[List[str]] = None, exclude_prefixes: Optional[List[str]] = None, ) -> List[dict]: processed = [] if not prefix_names: prefix_names = [] if exclude_prefixes: resources = exclude_resources_with_prefix(resources, exclude_prefixes) logger.info(f"Detected {len(resources.items)} {kind}s.") for resource in resources.items: r = resource r.api_version = resources.api_version r.kind = kind resource_metadata = r.metadata resource_namespace = resource_metadata.namespace resource_name = resource_metadata.name if prefix_names: matched_prefix = [resource_name.startswith(prefix) for prefix in prefix_names] if not any(matched_prefix): continue resource_type = _get_resource_type_prefix(kind) processed.append( { "data": generic.sanitize_for_serialization(obj=r), "zinfo": f"{resource_namespace}/{directory_path}/{resource_type}.{resource_name}.yaml", } ) return processed def _get_resource_type_prefix(kind: str) -> str: if len(kind) > 12: # get every first capital letter in the kind resource_type = "".join([c for c in kind if c.isupper()]).lower() else: resource_type = kind.lower() return resource_type def exclude_resources_with_prefix(resources: K8sRuntimeResources, exclude_prefixes: List[str]) -> K8sRuntimeResources: for prefix in exclude_prefixes: resources.items = [resource for resource in resources.items if not resource.metadata.name.startswith(prefix)] return resources