azext_edge/edge/providers/support/meta.py (64 lines of code) (raw):
# coding=utf-8
# ----------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License file in the project root for license information.
# ----------------------------------------------------------------------------------------------
from functools import partial
from typing import Iterable, Optional
from knack.log import get_logger
from azext_edge.edge.providers.support.billing import BILLING_RESOURCE_KIND, AIO_USAGE_PREFIX
from ..edge_api import META_API_V1, EdgeResourceApi
from .base import (
DAY_IN_SECONDS,
assemble_crd_work,
process_deployments,
process_jobs,
process_replicasets,
process_services,
process_v1_pods,
)
from .common import NAME_LABEL_FORMAT
logger = get_logger(__name__)
META_NAME_LABEL = NAME_LABEL_FORMAT.format(label=META_API_V1.label)
META_DIRECTORY_PATH = META_API_V1.moniker
META_PREFIX_NAMES = "aio-operator"
def fetch_deployments():
return process_deployments(
directory_path=META_DIRECTORY_PATH,
label_selector=META_NAME_LABEL,
exclude_prefixes=[AIO_USAGE_PREFIX, BILLING_RESOURCE_KIND],
)
def fetch_replicasets():
return process_replicasets(
directory_path=META_DIRECTORY_PATH,
label_selector=META_NAME_LABEL,
exclude_prefixes=[AIO_USAGE_PREFIX, BILLING_RESOURCE_KIND],
)
def fetch_pods(since_seconds: int = DAY_IN_SECONDS):
return process_v1_pods(
directory_path=META_DIRECTORY_PATH,
label_selector=META_NAME_LABEL,
since_seconds=since_seconds,
exclude_prefixes=[AIO_USAGE_PREFIX, BILLING_RESOURCE_KIND],
)
def fetch_services():
return process_services(
directory_path=META_DIRECTORY_PATH,
label_selector=META_NAME_LABEL,
prefix_names=[META_PREFIX_NAMES],
exclude_prefixes=[AIO_USAGE_PREFIX, BILLING_RESOURCE_KIND],
)
def fetch_jobs():
return process_jobs(
directory_path=META_DIRECTORY_PATH,
label_selector=META_NAME_LABEL,
exclude_prefixes=[AIO_USAGE_PREFIX, BILLING_RESOURCE_KIND],
)
support_runtime_elements = {
"deployments": fetch_deployments,
"replicasets": fetch_replicasets,
"services": fetch_services,
"jobs": fetch_jobs,
}
def prepare_bundle(log_age_seconds: int = DAY_IN_SECONDS, apis: Optional[Iterable[EdgeResourceApi]] = None) -> dict:
meta_to_run = {}
if apis:
meta_to_run.update(assemble_crd_work(apis))
support_runtime_elements["pods"] = partial(fetch_pods, since_seconds=log_age_seconds)
meta_to_run.update(support_runtime_elements)
return meta_to_run