azext_edge/edge/providers/orchestration/targets.py (356 lines of code) (raw):
# coding=utf-8
# ----------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License file in the project root for license information.
# ----------------------------------------------------------------------------------------------
from enum import IntEnum
from typing import Dict, List, NamedTuple, Optional, Set, Tuple
from azure.cli.core.azclierror import InvalidArgumentValueError
from ...common import (
DEFAULT_BROKER,
DEFAULT_BROKER_AUTHN,
DEFAULT_BROKER_LISTENER,
DEFAULT_DATAFLOW_ENDPOINT,
DEFAULT_DATAFLOW_PROFILE,
)
from ...util import parse_kvp_nargs, url_safe_hash_phrase
from ...util.az_client import parse_resource_id
from ..orchestration.common import (
TRUST_ISSUER_KIND_KEY,
TRUST_SETTING_KEYS,
)
from ..orchestration.resources.instances import parse_feature_kvp_nargs
from .common import KubernetesDistroType
from .template import (
TEMPLATE_BLUEPRINT_ENABLEMENT,
TEMPLATE_BLUEPRINT_INSTANCE,
TemplateBlueprint,
get_insecure_listener,
)
class InstancePhase(IntEnum):
EXT = 1
INSTANCE = 2
RESOURCES = 3
PHASE_KEY_MAP: Dict[str, Set[str]] = {
InstancePhase.EXT: {"cluster", "aio_extension"},
InstancePhase.INSTANCE: {"aioInstance", "aio_syncRule", "deviceRegistry_syncRule"},
}
class VarAttr(NamedTuple):
value: str
template_key: str
moniker: str
class InitTargets:
def __init__(
self,
cluster_name: str,
resource_group_name: str,
schema_registry_resource_id: Optional[str] = None,
cluster_namespace: str = "azure-iot-operations",
location: Optional[str] = None,
custom_location_name: Optional[str] = None,
enable_rsync_rules: Optional[bool] = None,
instance_name: Optional[str] = None,
instance_description: Optional[str] = None,
instance_features: Optional[List[str]] = None,
tags: Optional[dict] = None,
enable_fault_tolerance: Optional[bool] = None,
# Extension config
ops_config: Optional[List[str]] = None,
ops_version: Optional[str] = None,
ops_train: Optional[str] = None,
acs_config: Optional[List[str]] = None,
acs_version: Optional[str] = None,
acs_train: Optional[str] = None,
ssc_config: Optional[List[str]] = None,
ssc_version: Optional[str] = None,
ssc_train: Optional[str] = None,
# Dataflow
dataflow_profile_instances: int = 1,
# Broker
custom_broker_config: Optional[dict] = None,
broker_memory_profile: Optional[str] = None,
broker_service_type: Optional[str] = None,
broker_backend_partitions: Optional[int] = None,
broker_backend_workers: Optional[int] = None,
broker_backend_redundancy_factor: Optional[int] = None,
broker_frontend_workers: Optional[int] = None,
broker_frontend_replicas: Optional[int] = None,
add_insecure_listener: Optional[bool] = None,
# Akri
kubernetes_distro: str = KubernetesDistroType.k8s.value,
container_runtime_socket: Optional[str] = None,
# User Trust Config
user_trust: Optional[bool] = None,
trust_settings: Optional[List[str]] = None,
**_,
):
self.cluster_name = cluster_name
self.resource_group_name = resource_group_name
# TODO - @digimaun
if schema_registry_resource_id:
parse_resource_id(schema_registry_resource_id)
self.schema_registry_resource_id = schema_registry_resource_id
self.cluster_namespace = self._sanitize_k8s_name(cluster_namespace)
self.location = location
if not custom_location_name:
custom_location_name = get_default_cl_name(
resource_group_name=resource_group_name, cluster_name=cluster_name, namespace=cluster_namespace
)
self.custom_location_name = self._sanitize_k8s_name(custom_location_name)
self.deploy_resource_sync_rules = bool(enable_rsync_rules)
self.instance_name = self._sanitize_k8s_name(instance_name)
self.instance_description = instance_description
self.instance_features = parse_feature_kvp_nargs(instance_features, strict=True)
self.tags = tags
self.enable_fault_tolerance = enable_fault_tolerance
# Extensions
self.ops_config = parse_kvp_nargs(ops_config)
self.ops_version = ops_version
self.ops_train = ops_train
self.acs_config = parse_kvp_nargs(acs_config)
self.acs_version = acs_version
self.acs_train = acs_train
self.ssc_config = parse_kvp_nargs(ssc_config)
self.ssc_version = ssc_version
self.ssc_train = ssc_train
self.user_trust = user_trust
self.trust_settings = parse_kvp_nargs(trust_settings)
self.trust_config = self.get_trust_settings_target_map()
self.advanced_config = self.get_advanced_config_target_map()
# Dataflow
self.dataflow_profile_instances = self._sanitize_int(dataflow_profile_instances)
# Broker
self.add_insecure_listener = add_insecure_listener
self.broker_memory_profile = broker_memory_profile
self.broker_service_type = broker_service_type
self.broker_backend_partitions = self._sanitize_int(broker_backend_partitions)
self.broker_backend_workers = self._sanitize_int(broker_backend_workers)
self.broker_backend_redundancy_factor = self._sanitize_int(broker_backend_redundancy_factor)
self.broker_frontend_workers = self._sanitize_int(broker_frontend_workers)
self.broker_frontend_replicas = self._sanitize_int(broker_frontend_replicas)
self.broker_config = self.get_broker_config_target_map()
self.custom_broker_config = custom_broker_config
# Akri
self.kubernetes_distro = kubernetes_distro
self.container_runtime_socket = container_runtime_socket
def _sanitize_k8s_name(self, name: Optional[str]) -> Optional[str]:
if not name:
return name
sanitized = str(name)
sanitized = sanitized.lower()
sanitized = sanitized.replace("_", "-")
return sanitized
def _sanitize_int(self, value: Optional[int]) -> Optional[int]:
if value is None:
return value
return int(value)
def _handle_apply_targets(
self, param_to_target: dict, template_blueprint: TemplateBlueprint
) -> Tuple[TemplateBlueprint, dict]:
template_copy = template_blueprint.copy()
built_in_template_params = template_copy.parameters
deploy_params = {}
for param in param_to_target:
if param in built_in_template_params and param_to_target[param] is not None:
deploy_params[param] = {"value": param_to_target[param]}
return template_copy, deploy_params
def get_extension_versions(self, for_enablement: bool = True) -> dict:
version_map = {}
get_template_method = self.get_ops_enablement_template
if not for_enablement:
get_template_method = self.get_ops_instance_template
template, _ = get_template_method()
template_vars = template["variables"]
for moniker in template_vars["VERSIONS"]:
version_map[moniker] = {"version": template_vars["VERSIONS"][moniker]}
for moniker in template_vars["TRAINS"]:
version_map[moniker]["train"] = template_vars["TRAINS"][moniker]
return version_map
def get_ops_enablement_template(
self,
) -> Tuple[dict, dict]:
template, parameters = self._handle_apply_targets(
param_to_target={
"clusterName": self.cluster_name,
"trustConfig": self.trust_config,
"advancedConfig": self.advanced_config,
},
template_blueprint=TEMPLATE_BLUEPRINT_ENABLEMENT,
)
base_acs_config = get_default_acs_config(enable_fault_tolerance=self.enable_fault_tolerance)
if self.acs_config:
base_acs_config.update(self.acs_config)
template.content["resources"]["container_storage_extension"]["properties"][
"configurationSettings"
] = base_acs_config
base_ssc_config = get_default_ssc_config()
if self.ssc_config:
base_ssc_config.update(self.ssc_config)
template.content["resources"]["secret_store_extension"]["properties"][
"configurationSettings"
] = base_ssc_config
for var_attr in [
VarAttr(value=self.acs_version, template_key="VERSIONS", moniker="containerStorage"),
VarAttr(value=self.acs_train, template_key="TRAINS", moniker="containerStorage"),
VarAttr(value=self.ssc_version, template_key="VERSIONS", moniker="secretStore"),
VarAttr(value=self.ssc_train, template_key="TRAINS", moniker="secretStore"),
]:
if var_attr.value:
template.content["variables"][var_attr.template_key][var_attr.moniker] = var_attr.value
if self.user_trust:
# patch enablement template expecting full trust settings for source: CustomerManaged
template.get_type_definition("_1.CustomerManaged")["properties"]["settings"]["nullable"] = True
return template.content, parameters
def get_ops_instance_template(
self,
cl_extension_ids: Optional[List[str]] = None,
phase: Optional[InstancePhase] = None,
) -> Tuple[dict, dict]:
if not cl_extension_ids:
cl_extension_ids = []
template, parameters = self._handle_apply_targets(
param_to_target={
"clusterName": self.cluster_name,
"clusterNamespace": self.cluster_namespace,
"clusterLocation": self.location,
"kubernetesDistro": self.kubernetes_distro,
"containerRuntimeSocket": self.container_runtime_socket,
"customLocationName": self.custom_location_name,
"clExtentionIds": cl_extension_ids,
"deployResourceSyncRules": self.deploy_resource_sync_rules,
"schemaRegistryId": self.schema_registry_resource_id,
"defaultDataflowinstanceCount": self.dataflow_profile_instances,
"brokerConfig": self.broker_config,
"trustConfig": self.trust_config,
},
template_blueprint=TEMPLATE_BLUEPRINT_INSTANCE,
)
if self.ops_config:
aio_default_config: Dict[str, str] = template.content["variables"]["defaultAioConfigurationSettings"]
aio_default_config.update(self.ops_config)
if self.ops_version:
template.content["variables"]["VERSIONS"]["iotOperations"] = self.ops_version
if self.ops_train:
template.content["variables"]["TRAINS"]["iotOperations"] = self.ops_train
instance = template.get_resource_by_key("aioInstance")
broker = template.get_resource_by_key("broker")
broker_authn = template.get_resource_by_key("broker_authn")
broker_listener = template.get_resource_by_key("broker_listener")
dataflow_profile = template.get_resource_by_key("dataflow_profile")
dataflow_endpoint = template.get_resource_by_key("dataflow_endpoint")
instance["properties"] = get_default_instance_config(
description=self.instance_description, features=self.instance_features
)
if self.instance_name:
instance["name"] = self.instance_name
broker["name"] = f"{self.instance_name}/{DEFAULT_BROKER}"
broker_authn["name"] = f"{self.instance_name}/{DEFAULT_BROKER}/{DEFAULT_BROKER_AUTHN}"
broker_listener["name"] = f"{self.instance_name}/{DEFAULT_BROKER}/{DEFAULT_BROKER_LISTENER}"
dataflow_profile["name"] = f"{self.instance_name}/{DEFAULT_DATAFLOW_PROFILE}"
dataflow_endpoint["name"] = f"{self.instance_name}/{DEFAULT_DATAFLOW_ENDPOINT}"
template.content["outputs"]["aio"]["value"]["name"] = self.instance_name
if self.tags:
instance["tags"] = self.tags
if self.custom_broker_config:
if "properties" in self.custom_broker_config:
self.custom_broker_config = self.custom_broker_config["properties"]
broker["properties"] = self.custom_broker_config
if self.add_insecure_listener:
template.add_resource(
resource_key="broker_listener_insecure",
resource_def=get_insecure_listener(instance_name=self.instance_name, broker_name=DEFAULT_BROKER),
)
resources: Dict[str, Dict[str, dict]] = template.content.get("resources", {})
if phase == InstancePhase.EXT:
del_if_not_in(resources, PHASE_KEY_MAP[InstancePhase.EXT])
return template.content, parameters
tracked_keys = (
PHASE_KEY_MAP[InstancePhase.EXT].union(PHASE_KEY_MAP[InstancePhase.INSTANCE]).union({"customLocation"})
)
if phase == InstancePhase.INSTANCE:
del_if_not_in(
resources,
tracked_keys,
)
set_read_only(resources, PHASE_KEY_MAP[InstancePhase.EXT].union({"customLocation"}))
return template.content, parameters
if phase == InstancePhase.RESOURCES:
set_read_only(resources, tracked_keys)
return template.content, parameters
def get_broker_config_target_map(self):
to_process_config_map = {
"frontendReplicas": self.broker_frontend_replicas,
"frontendWorkers": self.broker_frontend_workers,
"backendRedundancyFactor": self.broker_backend_redundancy_factor,
"backendWorkers": self.broker_backend_workers,
"backendPartitions": self.broker_backend_partitions,
"memoryProfile": self.broker_memory_profile,
"serviceType": self.broker_service_type,
}
processed_config_map = {}
validation_errors = []
broker_config_def = TEMPLATE_BLUEPRINT_INSTANCE.get_type_definition("_1.BrokerConfig")["properties"]
for config in to_process_config_map:
if to_process_config_map[config] is None:
continue
processed_config_map[config] = to_process_config_map[config]
if not broker_config_def:
continue
if isinstance(to_process_config_map[config], int):
if config in broker_config_def and broker_config_def[config].get("type") == "int":
min_value = broker_config_def[config].get("minValue")
max_value = broker_config_def[config].get("maxValue")
if all([min_value is None, max_value is None]):
continue
if any([to_process_config_map[config] < min_value, to_process_config_map[config] > max_value]):
error_msg = f"{config} value range"
if min_value:
error_msg += f" min:{min_value}"
if max_value:
error_msg += f" max:{max_value}"
validation_errors.append(error_msg)
if validation_errors:
raise InvalidArgumentValueError("\n".join(validation_errors))
return processed_config_map
def get_advanced_config_target_map(self) -> dict:
processed_config_map = {}
if self.enable_fault_tolerance:
processed_config_map["edgeStorageAccelerator"] = {"faultToleranceEnabled": True}
return processed_config_map
def get_trust_settings_target_map(self) -> dict:
source = "SelfSigned"
if self.trust_settings or self.user_trust:
source = "CustomerManaged"
result = {"source": source}
if self.trust_settings:
target_settings: Dict[str, str] = {}
trust_bundle_def = TEMPLATE_BLUEPRINT_ENABLEMENT.get_type_definition("_1.TrustBundleSettings")[
"properties"
]
allowed_issuer_kinds: Optional[List[str]] = trust_bundle_def.get(TRUST_ISSUER_KIND_KEY, {}).get(
"allowedValues"
)
for key in TRUST_SETTING_KEYS:
if key not in self.trust_settings:
raise InvalidArgumentValueError(f"{key} is a required trust setting/key.")
if key == TRUST_ISSUER_KIND_KEY:
if allowed_issuer_kinds and self.trust_settings[key] not in allowed_issuer_kinds:
raise InvalidArgumentValueError(f"{key} allowed values are {allowed_issuer_kinds}.")
target_settings[key] = self.trust_settings[key]
result["settings"] = target_settings
return result
def get_default_cl_name(resource_group_name: str, cluster_name: str, namespace: str) -> str:
return "location-" + url_safe_hash_phrase(f"{resource_group_name}{cluster_name}{namespace}")[:5]
def set_read_only(resources: Dict[str, Dict[str, dict]], resource_keys: Set[str]):
for r in resource_keys:
res: dict = resources.get(r, {})
for k in list(res.keys()):
if k not in {"type", "apiVersion", "name", "scope", "condition"}:
del res[k]
res["existing"] = True
def del_if_not_in(resources: Dict[str, Dict[str, dict]], include_keys: Set[str]):
for k in list(resources.keys()):
if k not in include_keys:
del resources[k]
def get_default_acs_config(enable_fault_tolerance: bool = False) -> Dict[str, str]:
config = {"edgeStorageConfiguration.create": "true", "feature.diskStorageClass": "default,local-path"}
if enable_fault_tolerance:
config["feature.diskStorageClass"] = "acstor-arccontainerstorage-storage-pool"
config["acstorConfiguration.create"] = "true"
config["acstorConfiguration.properties.diskMountPoint"] = "/mnt"
return config
def get_default_ssc_config() -> Dict[str, str]:
return {
"rotationPollIntervalInSeconds": "120",
"validatingAdmissionPolicies.applyPolicies": "false",
}
def get_default_instance_config(description: Optional[str] = None, features: Optional[dict] = None) -> dict:
return {
"description": description,
"schemaRegistryRef": {"resourceId": "[parameters('schemaRegistryId')]"},
"features": features,
}