azext_edge/edge/providers/support/connectors.py (93 lines of code) (raw):
# coding=utf-8
# ----------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License file in the project root for license information.
# ----------------------------------------------------------------------------------------------
from functools import partial
from knack.log import get_logger
from .base import (
DAY_IN_SECONDS,
process_config_maps,
process_daemonsets,
process_deployments,
process_services,
process_v1_pods,
process_replicasets,
)
from .common import NAME_LABEL_FORMAT
logger = get_logger(__name__)
# TODO: more connectors will be added
SIMULATOR_PREFIX = "opcplc-"
OPC_PREFIX = "aio-opc-"
OPC_APP_LABEL = "app in (aio-opc-supervisor, aio-opc-admission-controller)"
OPC_NAME_LABEL = NAME_LABEL_FORMAT.format(label="aio-opc-opcua-connector, opcplc")
OPC_NAME_VAR_LABEL = "name in (aio-opc-asset-discovery)"
CONNECTORS_DIRECTORY_PATH = "connectors"
# TODO: once this label is stabled, we can remove the other labels
OPCUA_NAME_LABEL = NAME_LABEL_FORMAT.format(label="microsoft-iotoperations-opcuabroker")
def fetch_pods(since_seconds: int = DAY_IN_SECONDS):
opcua_pods = []
pod_name_labels = [
OPC_APP_LABEL,
OPC_NAME_LABEL,
OPC_NAME_VAR_LABEL,
]
for pod_name_label in pod_name_labels:
opcua_pods.extend(
process_v1_pods(
directory_path=CONNECTORS_DIRECTORY_PATH,
label_selector=pod_name_label,
since_seconds=since_seconds,
include_metrics=True,
)
)
opcua_pods.extend(
process_v1_pods(
directory_path=CONNECTORS_DIRECTORY_PATH,
label_selector=OPCUA_NAME_LABEL,
since_seconds=since_seconds,
include_metrics=True,
)
)
return opcua_pods
def fetch_deployments():
processed = process_deployments(
directory_path=CONNECTORS_DIRECTORY_PATH, prefix_names=[OPC_PREFIX, SIMULATOR_PREFIX]
)
processed.extend(process_deployments(directory_path=CONNECTORS_DIRECTORY_PATH, label_selector=OPC_NAME_LABEL))
processed.extend(process_deployments(directory_path=CONNECTORS_DIRECTORY_PATH, label_selector=OPCUA_NAME_LABEL))
return processed
def fetch_replicasets():
processed = process_replicasets(directory_path=CONNECTORS_DIRECTORY_PATH, label_selector=OPC_APP_LABEL)
processed.extend(process_replicasets(directory_path=CONNECTORS_DIRECTORY_PATH, label_selector=OPC_NAME_LABEL))
processed.extend(process_replicasets(directory_path=CONNECTORS_DIRECTORY_PATH, label_selector=OPCUA_NAME_LABEL))
return processed
def fetch_services():
processed = process_services(directory_path=CONNECTORS_DIRECTORY_PATH, label_selector=OPC_APP_LABEL)
processed.extend(process_services(directory_path=CONNECTORS_DIRECTORY_PATH, prefix_names=[SIMULATOR_PREFIX]))
processed.extend(process_services(directory_path=CONNECTORS_DIRECTORY_PATH, label_selector=OPCUA_NAME_LABEL))
return processed
def fetch_configmaps():
return process_config_maps(
directory_path=CONNECTORS_DIRECTORY_PATH,
label_selector=OPCUA_NAME_LABEL,
)
def fetch_daemonsets():
processed = process_daemonsets(
directory_path=CONNECTORS_DIRECTORY_PATH,
field_selector="metadata.name==aio-opc-asset-discovery",
)
processed.extend(
process_daemonsets(
directory_path=CONNECTORS_DIRECTORY_PATH,
label_selector=OPCUA_NAME_LABEL,
)
)
return processed
support_runtime_elements = {
"daemonsets": fetch_daemonsets,
"configmaps": fetch_configmaps,
"deployments": fetch_deployments,
"replicasets": fetch_replicasets,
"services": fetch_services,
}
def prepare_bundle(
log_age_seconds: int = DAY_IN_SECONDS,
) -> dict:
connectors_to_run = {}
connectors_to_run["pods"] = partial(fetch_pods, since_seconds=log_age_seconds)
connectors_to_run.update(support_runtime_elements)
return connectors_to_run