azext_edge/edge/providers/support/schemaregistry.py (51 lines of code) (raw):
# coding=utf-8
# ----------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License file in the project root for license information.
# ----------------------------------------------------------------------------------------------
from functools import partial
from knack.log import get_logger
from .base import (
DAY_IN_SECONDS,
process_config_maps,
process_persistent_volume_claims,
process_services,
process_statefulset,
process_v1_pods,
)
from .common import NAME_LABEL_FORMAT
logger = get_logger(__name__)
SCHEMAS_NAME_LABEL = NAME_LABEL_FORMAT.format(label="microsoft-iotoperations-schemas")
SCHEMAS_DIRECTORY_PATH = "schemaregistry"
def fetch_stateful_sets():
return process_statefulset(
directory_path=SCHEMAS_DIRECTORY_PATH,
label_selector=SCHEMAS_NAME_LABEL,
)
def fetch_pods(since_seconds: int = DAY_IN_SECONDS):
return process_v1_pods(
directory_path=SCHEMAS_DIRECTORY_PATH,
label_selector=SCHEMAS_NAME_LABEL,
since_seconds=since_seconds,
)
def fetch_config_map():
return process_config_maps(
directory_path=SCHEMAS_DIRECTORY_PATH,
label_selector=SCHEMAS_NAME_LABEL,
)
def fetch_services():
return process_services(
directory_path=SCHEMAS_DIRECTORY_PATH,
label_selector=SCHEMAS_NAME_LABEL,
)
def fetch_persistent_volume_claims():
return process_persistent_volume_claims(
directory_path=SCHEMAS_DIRECTORY_PATH,
label_selector=SCHEMAS_NAME_LABEL,
)
support_runtime_elements = {
"statefulsets": fetch_stateful_sets,
"configmaps": fetch_config_map,
"services": fetch_services,
"persistentvolumeclaims": fetch_persistent_volume_claims,
}
def prepare_bundle(log_age_seconds: int = DAY_IN_SECONDS) -> dict:
schemas_to_run = {}
support_runtime_elements["pods"] = partial(fetch_pods, since_seconds=log_age_seconds)
schemas_to_run.update(support_runtime_elements)
return schemas_to_run