azext_edge/edge/providers/support/arccontainerstorage.py (110 lines of code) (raw):

# coding=utf-8 # ---------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License file in the project root for license information. # ---------------------------------------------------------------------------------------------- from functools import partial from typing import Iterable, Optional from knack.log import get_logger from ..edge_api import ARCCONTAINERSTORAGE_API_V1, CONTAINERSTORAGE_API_V1, EdgeResourceApi from .base import ( DAY_IN_SECONDS, assemble_crd_work, process_config_maps, process_daemonsets, process_deployments, process_persistent_volume_claims, process_replicasets, process_services, process_v1_pods, ) logger = get_logger(__name__) STORAGE_DIRECTORY_PATH = ARCCONTAINERSTORAGE_API_V1.moniker ACSTOR_DIRECTORY_PATH = CONTAINERSTORAGE_API_V1.moniker # TODO: Use common label once it is ready STORAGE_NAMESPACE = "azure-arc-containerstorage" ACSTOR_NAMESPACE = "azure-arc-acstor" def fetch_deployments(): processed = process_deployments( directory_path=STORAGE_DIRECTORY_PATH, namespace=STORAGE_NAMESPACE, ) processed.extend( process_deployments( directory_path=ACSTOR_DIRECTORY_PATH, namespace=ACSTOR_NAMESPACE, ) ) return processed def fetch_replicasets(): processed = process_replicasets( directory_path=STORAGE_DIRECTORY_PATH, namespace=STORAGE_NAMESPACE, ) processed.extend( process_replicasets( directory_path=ACSTOR_DIRECTORY_PATH, namespace=ACSTOR_NAMESPACE, ) ) return processed def fetch_pods(since_seconds: int = DAY_IN_SECONDS): processed = process_v1_pods( directory_path=STORAGE_DIRECTORY_PATH, namespace=STORAGE_NAMESPACE, since_seconds=since_seconds, ) processed.extend( process_v1_pods( directory_path=ACSTOR_DIRECTORY_PATH, namespace=ACSTOR_NAMESPACE, since_seconds=since_seconds, ) ) return processed def fetch_daemonsets(): processed = process_daemonsets( directory_path=STORAGE_DIRECTORY_PATH, namespace=STORAGE_NAMESPACE, ) processed.extend( process_daemonsets( directory_path=ACSTOR_DIRECTORY_PATH, namespace=ACSTOR_NAMESPACE, ) ) return processed def fetch_services(): processed = process_services( directory_path=STORAGE_DIRECTORY_PATH, namespace=STORAGE_NAMESPACE, ) processed.extend( process_services( directory_path=ACSTOR_DIRECTORY_PATH, namespace=ACSTOR_NAMESPACE, ) ) return processed def fetch_peristent_volume_claims(): return process_persistent_volume_claims( directory_path=STORAGE_DIRECTORY_PATH, namespace=STORAGE_NAMESPACE, ) def fetch_configmaps(): return process_config_maps( directory_path=ACSTOR_DIRECTORY_PATH, namespace=ACSTOR_NAMESPACE, ) support_runtime_elements = { "configmaps": fetch_configmaps, "daemonsets": fetch_daemonsets, "deployments": fetch_deployments, "persistentvolumeclaims": fetch_peristent_volume_claims, "replicasets": fetch_replicasets, "services": fetch_services, } def prepare_bundle( log_age_seconds: int = DAY_IN_SECONDS, apis: Optional[Iterable[EdgeResourceApi]] = None, ) -> dict: acs_to_run = {} if apis: acs_to_run.update(assemble_crd_work(apis=apis, fallback_namespace=STORAGE_NAMESPACE)) support_runtime_elements["pods"] = partial(fetch_pods, since_seconds=log_age_seconds) acs_to_run.update(support_runtime_elements) return acs_to_run