azext_edge/edge/providers/orchestration/clone.py (1,217 lines of code) (raw):
# coding=utf-8
# ----------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License file in the project root for license information.
# ----------------------------------------------------------------------------------------------
from copy import deepcopy
from enum import Enum
from json import dumps
from pathlib import Path, PurePath
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, Union
from uuid import uuid4
from azure.cli.core.azclierror import ValidationError
from azure.core.exceptions import HttpResponseError
from knack.log import get_logger
from packaging.version import parse as parse_version
from rich.console import Console
from rich.progress import (
Progress,
SpinnerColumn,
TimeElapsedColumn,
)
from rich.table import Table, box
from ....constants import VERSION as CLI_VERSION
from ...util import (
chunk_list,
should_continue_prompt,
to_safe_filename,
)
from ...util.az_client import (
REGISTRY_API_VERSION,
get_msi_mgmt_client,
get_resource_client,
wait_for_terminal_state,
)
from ...util.id_tools import is_valid_resource_id, parse_resource_id
from .common import (
CLONE_INSTANCE_VERS_MAX,
CLONE_INSTANCE_VERS_MIN,
CONTRIBUTOR_ROLE_ID,
CUSTOM_LOCATIONS_API_VERSION,
EXTENSION_TYPE_ACS,
EXTENSION_TYPE_OPS,
EXTENSION_TYPE_PLATFORM,
EXTENSION_TYPE_SSC,
EXTENSION_TYPE_TO_MONIKER_MAP,
)
from .common import (
CloneSummaryMode as SummaryMode,
)
from .common import (
CloneTemplateMode as TemplateMode,
)
from .connected_cluster import ConnectedCluster
from .resources import Instances
from .resources.instances import (
SERVICE_ACCOUNT_DATAFLOW,
SERVICE_ACCOUNT_SECRETSYNC,
get_fc_name,
)
if TYPE_CHECKING:
from azure.core.polling import LROPoller
logger = get_logger(__name__)
DEFAULT_CONSOLE = Console()
DEPLOYMENT_CHUNK_LEN = 800
DEPLOYMENT_DATA_SIZE_KB = 1024
class StateResourceKey(Enum):
CL = "customLocation"
INSTANCE = "instance"
BROKER = "broker"
LISTENER = "listener"
AUTHN = "authn"
AUTHZ = "authz"
PROFILE = "dataflowProfile"
ENDPOINT = "dataflowEndpoint"
DATAFLOW = "dataflow"
ASSET = "asset"
ASSET_ENDPOINT_PROFILE = "assetEndpointProfile"
SSC_SPC = "secretProviderClass"
SSC_SECRETSYNC = "secretSync"
ROLE_ASSIGNMENT = "roleAssignment"
FEDERATE = "identityFederation"
class TemplateParams(Enum):
INSTANCE_NAME = "instanceName"
CLUSTER_NAME = "clusterName"
CLUSTER_NAMESPACE = "clusterNamespace"
CUSTOM_LOCATION_NAME = "customLocationName"
OPS_EXTENSION_NAME = "opsExtensionName"
SUBSCRIPTION = "subscription"
RESOURCEGROUP = "resourceGroup"
SCHEMA_REGISTRY_ID = "schemaRegistryId"
PRINCIPAL_ID = "principalId"
RESOURCE_SLUG = "resourceSlug"
TEMPLATE_EXPRESSION_MAP = {
"instanceName": f"[parameters('{TemplateParams.INSTANCE_NAME.value}')]",
"instanceNestedName": (f"[concat(parameters('{TemplateParams.INSTANCE_NAME.value}'), " "'{}')]"),
"clusterName": f"[parameters('{TemplateParams.CLUSTER_NAME.value}')]",
"clusterNamespace": f"[parameters('{TemplateParams.CLUSTER_NAMESPACE.value}')]",
"clusterId": (
"[resourceId('Microsoft.Kubernetes/connectedClusters', " f"parameters('{TemplateParams.CLUSTER_NAME.value}'))]"
),
"customLocationName": f"[parameters('{TemplateParams.CUSTOM_LOCATION_NAME.value}')]",
"customLocationId": (
"[resourceId('Microsoft.ExtendedLocation/customLocations', "
f"parameters('{TemplateParams.CUSTOM_LOCATION_NAME.value}'))]"
),
"extensionId": (
"[concat(resourceId('Microsoft.Kubernetes/connectedClusters', "
f"parameters('{TemplateParams.CLUSTER_NAME.value}')), "
"'/providers/Microsoft.KubernetesConfiguration/extensions/{})]"
),
"opsExtensionName": f"[parameters('{TemplateParams.OPS_EXTENSION_NAME.value}')]",
"schemaRegistryId": f"[parameters('{TemplateParams.SCHEMA_REGISTRY_ID.value}')]",
}
def get_resource_id_expr(rtype: str, resource_id: str, for_instance: bool = True) -> str:
id_meta = parse_resource_id(resource_id)
initial_seg = f"parameters('{TemplateParams.INSTANCE_NAME.value}')" if for_instance else id_meta["name"]
target_name = f"'{initial_seg}'"
if for_instance:
target_name = f"parameters('{TemplateParams.INSTANCE_NAME.value}')"
last_child_num = id_meta.get("last_child_num", 0)
if last_child_num:
for i in range(1, last_child_num + 1):
target_name += f", '{id_meta[f'child_name_{i}']}'"
return f"[resourceId('{rtype}', {target_name})]"
def get_resource_id_by_parts(rtype: str, *args) -> str:
def _rem_first_last(s: str, c: str):
first = s.find(c)
last = s.rfind(c)
if first == -1 or first == last:
return s
return s[:first] + s[first + 1 : last] + s[last + 1 :]
name_parts = ""
for arg in args:
name_parts += f", '{arg}'"
# TODO: very hacky
if "concat(" in name_parts:
name_parts = _rem_first_last(name_parts, "'")
return f"[resourceId('{rtype}'{name_parts})]"
def get_resource_id_by_param(rtype: str, param: TemplateParams) -> str:
return f"[resourceId('{rtype}', parameters('{param.value}'))]"
def get_ops_extension_name(extension_names: List[str]) -> Optional[str]:
for name in extension_names:
part = name.rsplit("/", 1)[-1]
if part.startswith("azure-iot-operations-") and not part.endswith("platform"):
return part
class DeploymentContainer:
"""
An abstraction for an ARM deployment resource, which deploys a set of resources.
"""
def __init__(
self,
name: str,
api_version: str = "2022-09-01",
parameters: Optional[dict] = None,
depends_on: Optional[Union[Iterable[str], str]] = None,
resource_group: Optional[str] = None,
subscription: Optional[str] = None,
):
self.name = name
self.rcontainer_map: Dict[str, "ResourceContainer"] = {}
self.api_version = api_version
self.parameters = parameters
self.depends_on = depends_on
if isinstance(self.depends_on, str):
self.depends_on = {self.depends_on}
self.resource_group = resource_group
self.subscription = subscription
def add_resources(
self,
key: Union[StateResourceKey, str],
api_version: str,
data_iter: Iterable[dict],
depends_on: Optional[List[Union[StateResourceKey, str]]] = None,
config: Optional[Dict[str, Any]] = None,
):
if isinstance(key, StateResourceKey):
key = key.value
depends_on = process_depends_on(depends_on)
to_enumerate = list(data_iter)
count = 0
for resource in to_enumerate:
count += 1
suffix = "" if count <= 1 else f"_{count}"
target_key = f"{key}{suffix}"
self.rcontainer_map[target_key] = ResourceContainer(
api_version=api_version,
resource_state=resource,
depends_on=depends_on,
config=config,
)
def get(self):
result = {
"type": "Microsoft.Resources/deployments",
"apiVersion": self.api_version,
"name": self.name,
"properties": {
"mode": "Incremental",
"template": {
"$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#",
"contentVersion": "1.0.0.0",
"resources": [r.get() for r in list(self.rcontainer_map.values())],
},
},
}
if self.resource_group:
result["resourceGroup"] = self.resource_group
if self.subscription:
# TODO: verify
# result["subscription"] = self.subscription
pass
if self.parameters:
input_param_map = {}
template_param_map = {}
for param in self.parameters:
target_value = (
self.parameters[param]["value"]
if "value" in self.parameters[param]
else f"[parameters('{param}')]"
)
input_param_map[param] = {"value": target_value}
template_param_map[param] = {"type": self.parameters[param]["type"]}
result["properties"]["parameters"] = input_param_map
result["properties"]["template"]["parameters"] = template_param_map
if self.depends_on:
result["dependsOn"] = list(self.depends_on)
return result
class ResourceContainer:
def __init__(
self,
api_version: str,
resource_state: dict,
depends_on: Optional[Iterable[str]] = None,
config: Optional[Dict[str, Any]] = None,
):
self.api_version = api_version
self.resource_state = resource_state
if depends_on:
depends_on = list(depends_on)
self.depends_on = depends_on
if not config:
config = {}
self.config = config
def _prune_resource(self):
filter_keys = {
"id",
"systemData",
}
self.resource_state = self._prune_resource_keys(filter_keys=filter_keys, resource=self.resource_state)
filter_keys = {
"provisioningState",
"currentVersion",
"statuses",
"status",
}
self.resource_state["properties"] = self._prune_resource_keys(
filter_keys=filter_keys, resource=self.resource_state["properties"]
)
def _prune_identity(self):
filter_keys = {"principalId"}
if "identity" in self.resource_state:
self.resource_state["identity"] = self._prune_resource_keys(
filter_keys=filter_keys, resource=self.resource_state["identity"]
)
@classmethod
def _prune_resource_keys(cls, filter_keys: set, resource: dict) -> dict:
result = {}
for key in resource:
if key not in filter_keys:
result[key] = resource[key]
return result
def _apply_cl_ref(self):
if "extendedLocation" in self.resource_state:
self.resource_state["extendedLocation"]["name"] = TEMPLATE_EXPRESSION_MAP["customLocationId"]
def _apply_nested_name(self):
def _extract_suffix(path: str) -> str:
return "/" + path.partition("/")[2]
if "id" in self.resource_state:
test: Dict[str, Union[str, int]] = parse_resource_id(self.resource_state["id"])
target_name = test["name"]
last_child_num = test.get("last_child_num", 0)
if last_child_num:
for i in range(1, last_child_num + 1):
target_name += f"/{test[f'child_name_{i}']}"
self.resource_state["name"] = target_name
if test["type"].lower() == "instances":
suffix = _extract_suffix(target_name)
if suffix == "/":
self.resource_state["name"] = TEMPLATE_EXPRESSION_MAP["instanceName"]
else:
self.resource_state["name"] = TEMPLATE_EXPRESSION_MAP["instanceNestedName"].format(suffix)
def get(self):
apply_nested_name = self.config.get("apply_nested_name", True)
if apply_nested_name:
self._apply_nested_name()
self._apply_cl_ref()
self._prune_identity()
self._prune_resource()
result = {
"apiVersion": self.api_version,
**self.resource_state,
}
if self.depends_on:
result["dependsOn"] = self.depends_on
return result
class InstanceRestore:
"""
Responsible for deploying the cloned instance template to a target cluster.
This class handles federation of credentials if they exist on the model instance.
"""
def __init__(
self,
cmd,
instances: Instances,
instance_record: dict,
namespace: str,
parsed_cluster_id: Dict[str, str],
template_content: "TemplateContent",
user_assigned_mis: Optional[List[str]] = None,
# TODO eliminate mode, only use split_content
template_mode: Optional[str] = None,
no_progress: Optional[bool] = None,
):
self.cmd = cmd
self.instances = instances
self.instance_record = instance_record
self.namespace = namespace
self.template_content = template_content
self.parsed_cluster_id = parsed_cluster_id
self.cluster_name = self.parsed_cluster_id["name"]
self.resource_group_name = self.parsed_cluster_id["resource_group"]
self.subscription_id = self.parsed_cluster_id["subscription"]
self.connected_cluster = ConnectedCluster(
cmd=self.cmd,
subscription_id=self.subscription_id,
cluster_name=self.cluster_name,
resource_group_name=self.resource_group_name,
)
self.resource_client = get_resource_client(subscription_id=self.subscription_id)
self.template_mode = template_mode
self.user_assigned_mis = user_assigned_mis
self.no_progress = no_progress
def _deploy_template(
self,
content: dict,
parameters: dict,
deployment_name: str,
) -> Optional["LROPoller"]:
deployment_params = {"properties": {"mode": "Incremental", "template": content, "parameters": parameters}}
headers = {"x-ms-correlation-request-id": str(uuid4()), "CommandName": "iot ops clone"}
return self.resource_client.deployments.begin_create_or_update(
resource_group_name=self.resource_group_name,
deployment_name=deployment_name,
parameters=deployment_params,
headers=headers,
)
def _handle_federation(self, use_self_hosted_issuer: Optional[bool] = None):
if not self.user_assigned_mis:
return
cluster_resource = self.connected_cluster.resource
oidc_issuer = self.instances._ensure_oidc_issuer(
cluster_resource, use_self_hosted_issuer=use_self_hosted_issuer
)
for mid in self.user_assigned_mis:
parsed_uami_id = parse_resource_id(mid)
msi_client = get_msi_mgmt_client(subscription_id=parsed_uami_id["subscription"])
credentials = list(
msi_client.federated_identity_credentials.list(
resource_group_name=parsed_uami_id["resource_group"], resource_name=parsed_uami_id["name"]
)
)
# We need an efficient way to federate credentials that are in scope.
# First we need to build context of prior federation. We enumerate existing credentials
# for every uami that is associated with the instance to build a dict of issuer to service account pairs
# and separately a map of service accounts in play. We then iterate through desired/target issuer
# to service account pairs to see if they are already present, where if not AND the service account
# is in play we federate with best attempt.
cred_map = {}
cluster_svc_acct_map = {}
expected_creds = [(oidc_issuer, SERVICE_ACCOUNT_SECRETSYNC), (oidc_issuer, SERVICE_ACCOUNT_DATAFLOW)]
for cred in credentials:
svc_acct = cred["properties"]["subject"].split(":")[-1]
cred_map[(cred["properties"]["issuer"], svc_acct)] = 1
cluster_svc_acct_map[svc_acct] = 1
for exp_cred in expected_creds:
if exp_cred not in cred_map and exp_cred[1] in cluster_svc_acct_map:
subject = f"system:serviceaccount:{self.namespace}:{exp_cred[1]}"
try:
# Federate with best attempt.
msi_client.federated_identity_credentials.create_or_update(
resource_group_name=parsed_uami_id["resource_group"],
resource_name=parsed_uami_id["name"],
federated_identity_credential_resource_name=get_fc_name(
cluster_name=self.cluster_name,
oidc_issuer=oidc_issuer,
subject=subject,
),
parameters={
"properties": {
"subject": subject,
"audiences": ["api://AzureADTokenExchange"],
"issuer": oidc_issuer,
}
},
)
except HttpResponseError as e:
logger.debug(e)
def deploy(
self,
instance_name: Optional[str] = None,
use_self_hosted_issuer: Optional[bool] = None,
):
if not self.connected_cluster.connected:
raise ValidationError(f"Cluster {self.connected_cluster.cluster_name} is not connected to Azure.")
parameters = {
"clusterName": {"value": self.cluster_name},
}
if instance_name:
parameters["instanceName"] = {"value": instance_name}
deployment_name = default_bundle_name(self.instance_record["name"])
DEFAULT_CONSOLE.print()
deployment_work = []
if self.template_mode == TemplateMode.LINKED.value:
deployment_work.extend(self.template_content.get_split_content())
else:
deployment_work.append(self.template_content.content)
total_pages = len(deployment_work)
with DEFAULT_CONSOLE.status("Preparing replication...") as console:
self._handle_federation(use_self_hosted_issuer)
# TODO: Show warnings if they exist from federation
for i in range(total_pages):
status = f"Replicating {deployment_name} {i+1}/{total_pages}"
console.update(status=status)
page = f"_{i+1}" if total_pages > 1 else ""
poller = self._deploy_template(
content=deployment_work[i],
parameters=parameters,
deployment_name=f"{deployment_name}{page}",
)
deployment_link = self._get_deployment_link(deployment_name=f"{deployment_name}{page}")
DEFAULT_CONSOLE.print(
f"->[link={deployment_link}]Link to {self.cluster_name} deployment {i+1}/{total_pages}[/link]",
highlight=False,
)
if total_pages > 1:
wait_for_terminal_state(poller)
DEFAULT_CONSOLE.print()
# TODO: re-use with work module
def _get_deployment_link(self, deployment_name: str) -> str:
return (
"https://portal.azure.com/#blade/HubsExtension/DeploymentDetailsBlade/id/"
f"%2Fsubscriptions%2F{self.subscription_id}%2FresourceGroups%2F{self.resource_group_name}"
f"%2Fproviders%2FMicrosoft.Resources%2Fdeployments%2F{deployment_name}"
)
def clone_instance(
cmd,
instance_name: str,
resource_group_name: str,
summary_mode: Optional[str] = None,
to_dir: Optional[str] = None,
template_mode: Optional[str] = None,
to_instance_name: Optional[str] = None,
to_cluster_id: Optional[str] = None,
use_self_hosted_issuer: Optional[bool] = None,
linked_base_uri: Optional[str] = None,
no_progress: Optional[bool] = None,
confirm_yes: Optional[bool] = None,
force: Optional[bool] = None,
**_,
):
parsed_cluster_id = {}
if to_cluster_id:
if not is_valid_resource_id(to_cluster_id):
raise ValidationError(f"Invalid resource Id: {to_cluster_id}")
parsed_cluster_id = parse_resource_id(to_cluster_id)
clone_manager = CloneManager(
cmd=cmd,
instance_name=instance_name,
resource_group_name=resource_group_name,
no_progress=no_progress,
)
bundle_path = get_bundle_path(instance_name, bundle_dir=to_dir)
clone_state = clone_manager.analyze_cluster(force)
if not no_progress:
render_clone_table(
clone_state=clone_state,
bundle_path=bundle_path,
parsed_cluster_id=parsed_cluster_id,
detailed=summary_mode == SummaryMode.DETAILED.value,
)
if all([not to_dir, not parsed_cluster_id]):
return
should_bail = not should_continue_prompt(confirm_yes=confirm_yes, context="Clone")
if should_bail:
return
template_content = clone_state.get_content()
template_content.write(bundle_path, template_mode=template_mode, linked_base_uri=linked_base_uri)
if parsed_cluster_id:
restore_client = clone_state.get_restore_client(
parsed_cluster_id=parsed_cluster_id, template_mode=template_mode, no_progress=no_progress
)
restore_client.deploy(instance_name=to_instance_name, use_self_hosted_issuer=use_self_hosted_issuer)
def render_clone_table(
clone_state: "CloneState",
bundle_path: Optional[PurePath] = None,
parsed_cluster_id: Optional[Dict[str, str]] = None,
detailed: bool = False,
):
table = get_default_table(include_name=detailed)
total = 0
for rtype in clone_state.resources:
rtype_len = len(clone_state.resources[rtype])
total += rtype_len
row_content = [f"{rtype}", f"{rtype_len}"]
if detailed:
row_content.append("\n".join([r["resource_name"] for r in clone_state.resources[rtype]]))
table.add_row(*row_content)
table.title += f" of {clone_state.instance_record['name']}\nTotal resources {total}"
DEFAULT_CONSOLE.print(table)
if bundle_path:
DEFAULT_CONSOLE.print(f"State will be saved to:\n-> {bundle_path}\n")
if clone_state.user_assigned_mis and not parsed_cluster_id:
DEFAULT_CONSOLE.print(
":exclamation: Credential federation of user-assigned managed "
"identity is currently only supported using --to-cluster-id."
)
if parsed_cluster_id:
DEFAULT_CONSOLE.print(
f"Clone will be replicated to connected cluster:\n"
f"* Name: {parsed_cluster_id['name']}\n"
f"* Resource Group: {parsed_cluster_id['resource_group']}\n"
f"* Subscription: {parsed_cluster_id['subscription']}\n",
highlight=False,
)
def get_default_table(include_name: bool = False) -> Table:
table = Table(
box=box.MINIMAL,
expand=False,
title="Capture",
min_width=79,
show_footer=True,
)
table.add_column("Resource Type")
table.add_column("#")
if include_name:
table.add_column("Name")
return table
class CloneState:
def __init__(
self,
cmd,
instance_record: str,
instances: Instances,
namespace: str,
resources: dict,
template_gen: "TemplateGen",
user_assigned_mis: Optional[List[str]] = None,
):
self.cmd = cmd
self.instance_record = instance_record
self.instances = instances
self.namespace = namespace
self.resources = resources
self.template_gen = template_gen
self.content = self.template_gen.get_content()
self.user_assigned_mis = user_assigned_mis
def get_content(self) -> "TemplateContent":
return self.content
def get_restore_client(
self,
parsed_cluster_id: Dict[str, str],
template_mode: Optional[str] = None,
no_progress: Optional[bool] = None,
) -> "InstanceRestore":
return InstanceRestore(
cmd=self.cmd,
instances=self.instances,
instance_record=self.instance_record,
namespace=self.namespace,
parsed_cluster_id=parsed_cluster_id,
template_content=self.content,
user_assigned_mis=self.user_assigned_mis,
template_mode=template_mode,
no_progress=no_progress,
)
class CloneManager:
"""
Encompasses the components for analyzing an instance and preparing it for cloning.
"""
def __init__(
self,
cmd,
resource_group_name: str,
instance_name: str,
no_progress: Optional[bool] = None,
):
self.cmd = cmd
self.instance_name = instance_name
self.resource_group_name = resource_group_name
self.no_progress = no_progress
self.instances = Instances(self.cmd)
self.instance_record = self.instances.show(
name=self.instance_name, resource_group_name=self.resource_group_name
)
self.version_guru = VersionGuru(self.instance_record)
self.custom_location = self.instances._get_associated_cl(self.instance_record)
self.resource_map = self.instances.get_resource_map(self.instance_record)
self.resouce_graph = self.resource_map.connected_cluster.resource_graph
self.rcontainer_map: Dict[str, ResourceContainer] = {}
self.parameter_map: dict = {}
self.variable_map: dict = {}
self.metadata_map: dict = {}
self.instance_identities: List[str] = []
self.active_deployment: Dict[StateResourceKey, List[str]] = {}
def analyze_cluster(self, force: Optional[bool] = None) -> "CloneState":
"""
This method analyzes the connected cluster and prepares the resources for cloning.
Ensure compatibility with the instance version and build the necessary parameters and metadata.
"""
with Progress(
SpinnerColumn("star"),
*Progress.get_default_columns(),
"Elapsed:",
TimeElapsedColumn(),
transient=True,
disable=bool(self.no_progress),
) as progress:
_ = progress.add_task(f"Analyzing {self.instance_name}...", total=None)
self.version_guru.ensure_compat(force)
self._build_parameters()
self._build_metadata()
self._analyze_extensions()
self._analyze_instance()
self._analyze_instance_identity()
self._analyze_instance_resources()
self._analyze_secretsync()
self._analyze_assets()
return CloneState(
cmd=self.cmd,
instance_record=self.instance_record,
instances=self.instances,
namespace=self.custom_location["properties"]["namespace"],
resources=self._enumerate_resources(),
template_gen=TemplateGen(
self.rcontainer_map, self.parameter_map, self.variable_map, self.metadata_map
),
user_assigned_mis=self.instance_identities,
)
def _enumerate_resources(self):
enumerated_map: dict = {}
def _enumerator(rcontainer_map: Dict[str, ResourceContainer]):
for resource in rcontainer_map:
target_rcontainer = rcontainer_map[resource]
if isinstance(target_rcontainer, ResourceContainer):
if "id" not in target_rcontainer.resource_state:
continue
parsed_id = parse_resource_id(target_rcontainer.resource_state["id"].lower())
key = f"{parsed_id['namespace']}/{parsed_id['type']}"
if "resource_type" in parsed_id and parsed_id["resource_type"] != parsed_id["type"]:
key += f"/{parsed_id['resource_type']}"
items: list = enumerated_map.get(key, [])
items.append(parsed_id)
enumerated_map[key] = items
if isinstance(target_rcontainer, DeploymentContainer):
_enumerator(target_rcontainer.rcontainer_map)
_enumerator(self.rcontainer_map)
return enumerated_map
def _build_parameters(self):
self.parameter_map.update(build_parameter(name=TemplateParams.CLUSTER_NAME.value))
self.parameter_map.update(
build_parameter(
name=TemplateParams.CLUSTER_NAMESPACE.value, default=self.custom_location["properties"]["namespace"]
)
)
self.parameter_map.update(
build_parameter(
name=TemplateParams.CUSTOM_LOCATION_NAME.value,
default=self.custom_location["name"],
)
)
self.parameter_map.update(
build_parameter(name=TemplateParams.INSTANCE_NAME.value, default=self.instance_record["name"])
)
self.parameter_map.update(
build_parameter(
name=TemplateParams.OPS_EXTENSION_NAME.value,
default=(
get_ops_extension_name(self.custom_location["properties"].get("clusterExtensionIds", []))
or "[format('azure-iot-operations-{0}', parameters('resourceSlug'))]"
),
)
)
self.parameter_map.update(
build_parameter(
name=TemplateParams.RESOURCE_SLUG.value,
default=(
"[take(uniqueString(resourceGroup().id, "
"parameters('clusterName'), parameters('clusterNamespace')), 5)]"
),
)
)
def _build_metadata(self):
self.metadata_map["opsCliVersion"] = CLI_VERSION
self.metadata_map["clonedInstanceId"] = self.instance_record["id"]
def get_resources_of_type(self, resource_type: str) -> List[dict]:
return self.resouce_graph.query_resources(
f"""
resources
| where extendedLocation.name =~ '{self.instance_record["extendedLocation"]["name"]}'
| where type =~ '{resource_type}'
| project id, name, type, location, extendedLocation, properties
"""
)["data"]
def get_identities_by_client_id(self, client_ids: List[str]) -> List[dict]:
return self.resouce_graph.query_resources(
f"""
resources
| where type =~ "Microsoft.ManagedIdentity/userAssignedIdentities"
| where properties.clientId in~ ("{'", "'.join(client_ids)}")
| project id, name, type, properties
"""
)["data"]
def _analyze_extensions(self):
depends_on_map = {
EXTENSION_TYPE_SSC: [EXTENSION_TYPE_TO_MONIKER_MAP[EXTENSION_TYPE_PLATFORM]],
EXTENSION_TYPE_ACS: [
EXTENSION_TYPE_TO_MONIKER_MAP[EXTENSION_TYPE_PLATFORM],
],
EXTENSION_TYPE_OPS: [
EXTENSION_TYPE_TO_MONIKER_MAP[EXTENSION_TYPE_PLATFORM],
EXTENSION_TYPE_TO_MONIKER_MAP[EXTENSION_TYPE_ACS],
EXTENSION_TYPE_TO_MONIKER_MAP[EXTENSION_TYPE_SSC],
],
}
api_version = (
self.resource_map.connected_cluster.clusters.extensions.clusterconfig_mgmt_client._config.api_version
)
extension_map = self.resource_map.connected_cluster.get_extensions_by_type(
EXTENSION_TYPE_PLATFORM, EXTENSION_TYPE_ACS, EXTENSION_TYPE_SSC, EXTENSION_TYPE_OPS
)
for extension_type in extension_map:
extension_moniker = EXTENSION_TYPE_TO_MONIKER_MAP[extension_type]
depends_on = depends_on_map.get(extension_type)
extension_map[extension_type]["scope"] = TEMPLATE_EXPRESSION_MAP["clusterId"]
if extension_moniker == EXTENSION_TYPE_TO_MONIKER_MAP[EXTENSION_TYPE_OPS]:
extension_map[extension_type]["name"] = TEMPLATE_EXPRESSION_MAP["opsExtensionName"]
self._add_resource(
key=extension_moniker,
api_version=api_version,
data=extension_map[extension_type],
depends_on=depends_on,
config={"apply_nested_name": False},
)
def _analyze_instance(self):
api_version = self.version_guru.get_instance_api()
custom_location = deepcopy(self.custom_location)
custom_location["properties"]["hostResourceId"] = TEMPLATE_EXPRESSION_MAP["clusterId"]
custom_location["properties"]["namespace"] = TEMPLATE_EXPRESSION_MAP["clusterNamespace"]
custom_location["name"] = TEMPLATE_EXPRESSION_MAP["customLocationName"]
cl_extension_ids = []
cl_monikers = [
EXTENSION_TYPE_TO_MONIKER_MAP[EXTENSION_TYPE_PLATFORM],
EXTENSION_TYPE_TO_MONIKER_MAP[EXTENSION_TYPE_SSC],
EXTENSION_TYPE_TO_MONIKER_MAP[EXTENSION_TYPE_OPS],
]
for moniker in cl_monikers:
ext_resource = self.rcontainer_map.get(moniker)
if not ext_resource:
continue
if moniker == EXTENSION_TYPE_TO_MONIKER_MAP[EXTENSION_TYPE_OPS]:
cl_extension_ids.append(
TEMPLATE_EXPRESSION_MAP["extensionId"].format("', parameters('opsExtensionName')")
)
else:
cl_extension_ids.append(
TEMPLATE_EXPRESSION_MAP["extensionId"].format(f"{ext_resource.resource_state['name']}'")
)
custom_location["properties"]["clusterExtensionIds"] = cl_extension_ids
custom_location["properties"]["displayName"] = TEMPLATE_EXPRESSION_MAP["customLocationName"]
# Custom location needs to be treated as a root resource.
self._add_resource(
key=StateResourceKey.CL,
api_version=CUSTOM_LOCATIONS_API_VERSION,
data=custom_location,
config={"apply_nested_name": False},
depends_on=cl_monikers,
)
self._add_resource(
key=StateResourceKey.INSTANCE,
api_version=api_version,
data=deepcopy(self.instance_record),
depends_on=StateResourceKey.CL,
)
nested_params = {
**build_parameter(name=TemplateParams.CLUSTER_NAME.value),
**build_parameter(name=TemplateParams.INSTANCE_NAME.value),
**build_parameter(
name=TemplateParams.PRINCIPAL_ID.value,
value="[reference('iotOperations', '2023-05-01', 'Full').identity.principalId]",
),
**build_parameter(
name=TemplateParams.SCHEMA_REGISTRY_ID.value,
value=self.instance_record["properties"]["schemaRegistryRef"]["resourceId"],
),
}
parsed_sr_id = parse_resource_id(self.instance_record["properties"]["schemaRegistryRef"]["resourceId"])
# Providing resource_group means a separate deployment to that resource group.
self._add_deployment(
key=StateResourceKey.ROLE_ASSIGNMENT,
api_version="2022-04-01",
data_iter=[get_role_assignment()],
depends_on=EXTENSION_TYPE_TO_MONIKER_MAP[EXTENSION_TYPE_OPS],
parameters=nested_params,
resource_group=parsed_sr_id["resource_group"],
subscription=parsed_sr_id["subscription"],
)
def _analyze_instance_resources(self):
api_version = self.version_guru.get_instance_api()
brokers_iter = self.instances.iotops_mgmt_client.broker.list_by_resource_group(
resource_group_name=self.resource_group_name, instance_name=self.instance_name
)
# Let us keep things simple atm
default_broker = list(brokers_iter)[0]
self._add_resource(
key=StateResourceKey.BROKER,
api_version=api_version,
data=default_broker,
depends_on=StateResourceKey.INSTANCE,
)
# Initial dependencies
nested_params = {
**build_parameter(name=TemplateParams.CUSTOM_LOCATION_NAME.value),
**build_parameter(name=TemplateParams.INSTANCE_NAME.value),
}
broker_resource_id_expr = get_resource_id_expr(rtype=default_broker["type"], resource_id=default_broker["id"])
# authN
self._add_deployment(
key=StateResourceKey.AUTHN,
api_version=api_version,
data_iter=self.instances.iotops_mgmt_client.broker_authentication.list_by_resource_group(
resource_group_name=self.resource_group_name,
instance_name=self.instance_name,
broker_name=default_broker["name"],
),
depends_on=broker_resource_id_expr,
parameters=nested_params,
)
# authZ
self._add_deployment(
key=StateResourceKey.AUTHZ,
api_version=api_version,
data_iter=self.instances.iotops_mgmt_client.broker_authorization.list_by_resource_group(
resource_group_name=self.resource_group_name,
instance_name=self.instance_name,
broker_name=default_broker["name"],
),
depends_on=broker_resource_id_expr,
parameters=nested_params,
)
# listener
listener_depends_on = []
for active in self.active_deployment:
if active in [StateResourceKey.AUTHN, StateResourceKey.AUTHZ]:
listener_depends_on.append(
get_resource_id_by_parts("Microsoft.Resources/deployments", self.active_deployment[active][-1])
)
self._add_deployment(
key=StateResourceKey.LISTENER,
api_version=api_version,
data_iter=self.instances.iotops_mgmt_client.broker_listener.list_by_resource_group(
resource_group_name=self.resource_group_name,
instance_name=self.instance_name,
broker_name=default_broker["name"],
),
depends_on=listener_depends_on,
parameters=nested_params,
)
instance_resource_id_expr = get_resource_id_by_param(
"microsoft.iotoperations/instances", TemplateParams.INSTANCE_NAME
)
# endpoint
self._add_deployment(
key=StateResourceKey.ENDPOINT,
api_version=api_version,
data_iter=self.instances.iotops_mgmt_client.dataflow_endpoint.list_by_resource_group(
resource_group_name=self.resource_group_name, instance_name=self.instance_name
),
depends_on=instance_resource_id_expr,
parameters=nested_params,
)
# profile
profile_iter = list(
self.instances.iotops_mgmt_client.dataflow_profile.list_by_resource_group(
resource_group_name=self.resource_group_name, instance_name=self.instance_name
)
)
self._add_deployment(
key=StateResourceKey.PROFILE,
api_version=api_version,
data_iter=profile_iter,
depends_on=instance_resource_id_expr,
parameters=nested_params,
)
# dataflow
if profile_iter:
dataflows = []
for profile in profile_iter:
dataflows.extend(
self.instances.iotops_mgmt_client.dataflow.list_by_profile_resource(
resource_group_name=self.resource_group_name,
instance_name=self.instance_name,
dataflow_profile_name=profile["name"],
)
)
self._add_deployment(
key=StateResourceKey.DATAFLOW,
api_version=api_version,
data_iter=dataflows,
depends_on=[
get_resource_id_by_parts(
"Microsoft.Resources/deployments", self.active_deployment[StateResourceKey.PROFILE][-1]
),
get_resource_id_by_parts(
"Microsoft.Resources/deployments", self.active_deployment[StateResourceKey.ENDPOINT][-1]
),
],
parameters=nested_params,
)
def _analyze_assets(self):
nested_params = {
**build_parameter(name=TemplateParams.CUSTOM_LOCATION_NAME.value),
}
instance_resource_id_expr = get_resource_id_by_param(
"microsoft.iotoperations/instances", TemplateParams.INSTANCE_NAME
)
asset_endpoints = self.get_resources_of_type(resource_type="microsoft.deviceregistry/assetendpointprofiles")
self._add_deployment(
key=StateResourceKey.ASSET_ENDPOINT_PROFILE,
api_version=REGISTRY_API_VERSION,
data_iter=asset_endpoints,
depends_on=[
instance_resource_id_expr,
get_resource_id_by_parts(
"Microsoft.Resources/deployments",
self.active_deployment[StateResourceKey.LISTENER][-1],
),
],
parameters=nested_params,
)
# TODO: Should this not wait on AEP?
assets = self.get_resources_of_type(resource_type="microsoft.deviceregistry/assets")
if assets and asset_endpoints:
self._add_deployment(
key=StateResourceKey.ASSET,
api_version=REGISTRY_API_VERSION,
data_iter=assets,
depends_on=get_resource_id_by_parts(
"Microsoft.Resources/deployments",
self.active_deployment[StateResourceKey.ASSET_ENDPOINT_PROFILE][-1],
),
parameters=nested_params,
)
def _analyze_secretsync(self):
nested_params = {
**build_parameter(name=TemplateParams.CUSTOM_LOCATION_NAME.value),
}
ssc_client = self.instances.ssc_mgmt_client
ssc_api_version = ssc_client._config.api_version
instance_resource_id_expr = get_resource_id_by_param(
"microsoft.iotoperations/instances", TemplateParams.INSTANCE_NAME
)
ext_loc_id = self.instance_record["extendedLocation"]["name"].lower()
ssc_spcs = list(
ssc_client.azure_key_vault_secret_provider_classes.list_by_resource_group(
resource_group_name=self.resource_group_name
)
)
ssc_spcs = [spc for spc in ssc_spcs if spc["extendedLocation"]["name"].lower() == ext_loc_id]
client_ids = [spc["properties"]["clientId"] for spc in ssc_spcs if "clientId" in spc["properties"]]
if client_ids:
self.instance_identities.extend([mid["id"] for mid in self.get_identities_by_client_id(client_ids)])
self._add_deployment(
key=StateResourceKey.SSC_SPC,
api_version=ssc_api_version,
data_iter=ssc_spcs,
depends_on=instance_resource_id_expr,
parameters=nested_params,
)
ssc_secretsyncs = list(
ssc_client.secret_syncs.list_by_resource_group(resource_group_name=self.resource_group_name)
)
ssc_secretsyncs = [
secretsync
for secretsync in ssc_secretsyncs
if secretsync["extendedLocation"]["name"].lower() == ext_loc_id
]
if ssc_secretsyncs and ssc_spcs:
self._add_deployment(
key=StateResourceKey.SSC_SECRETSYNC,
api_version=ssc_api_version,
data_iter=ssc_secretsyncs,
depends_on=get_resource_id_by_parts(
"Microsoft.Resources/deployments", self.active_deployment[StateResourceKey.SSC_SPC][-1]
),
parameters=nested_params,
)
def _analyze_instance_identity(self):
target_instance = getattr(self.rcontainer_map[StateResourceKey.INSTANCE.value], "resource_state", {})
identity: dict = target_instance.get("identity", {}).get("userAssignedIdentities", {})
for rid in identity:
identity[rid] = {}
self.instance_identities.append(rid)
def _add_deployment_by_key(self, key: StateResourceKey) -> Tuple[str, str]:
deployments_by_key = self.active_deployment.get(key, [])
symbolic_name = f"{key.value}s_{len(deployments_by_key)+1}"
deployment_name = f"concat(parameters('resourceSlug'), '_{symbolic_name}')"
deployments_by_key.append(deployment_name)
self.active_deployment[key] = deployments_by_key
return symbolic_name, deployment_name
def _add_deployment(
self,
key: StateResourceKey,
api_version: str,
data_iter: Iterable,
depends_on: Optional[Union[str, Iterable[str]]] = None,
parameters: Optional[dict] = None,
resource_group: Optional[str] = None,
subscription: Optional[str] = None,
):
data_iter = list(data_iter)
if data_iter:
chunked_list_data = chunk_list(
data=data_iter, chunk_len=DEPLOYMENT_CHUNK_LEN, data_size=DEPLOYMENT_DATA_SIZE_KB
)
for chunk in chunked_list_data:
symbolic_name, deployment_name = self._add_deployment_by_key(key)
deployment_container = DeploymentContainer(
name=f"[{deployment_name}]",
depends_on=depends_on,
parameters=parameters,
resource_group=resource_group,
subscription=subscription,
)
deployment_container.add_resources(
key=key,
api_version=api_version,
data_iter=chunk,
)
# Root deployments have root resources, which may be deployments, which in turn deploy resources
self.rcontainer_map[symbolic_name] = deployment_container
def _add_resource(
self,
key: Union[StateResourceKey, str],
api_version: str,
data: dict,
depends_on: Optional[Union[Iterable[str], str]] = None,
config: Optional[Dict[str, Any]] = None,
):
if isinstance(key, StateResourceKey):
key = key.value
depends_on = process_depends_on(depends_on)
self.rcontainer_map[key] = ResourceContainer(
api_version=api_version,
resource_state=data,
depends_on=depends_on,
config=config,
)
class TemplateContent:
"""
Manages application of template content.
"""
def __init__(self, content: dict):
self._content = content
self.linked_type_map = {
"microsoft.deviceregistry/assets": 0,
"microsoft.deviceregistry/assetendpointprofiles": 0,
}
self.copy_params = {"clusterName", "customLocationName", "instanceName"}
@property
def content(self) -> dict:
return deepcopy(self._content)
def get_split_content(self) -> List[dict]:
"""
Used with the instance restore client. The root template and template for each
nested deployment (in consideration) gets separated and added to a queue to be deployed serially.
"""
content = self.content
result = [content]
parameters = content.get("parameters", {})
resources: Dict[str, Dict[str, dict]] = content.get("resources", {})
for key in list(resources.keys()):
if resources[key].get("type", "").lower() != "microsoft.resources/deployments":
continue
nested_resources: List[dict] = resources[key]["properties"]["template"].get("resources", [])
if not nested_resources:
continue
nested_type = nested_resources[0].get("type", "").lower()
if nested_type not in self.linked_type_map:
continue
for param in self.copy_params:
resources[key]["properties"]["template"]["parameters"][param] = parameters[param]
result.append(resources[key]["properties"].pop("template"))
del resources[key]
return result
def _get_deployments(
self, root_dir: str, linked_base_uri: Optional[str] = None
) -> Tuple[dict, List[Tuple[str, dict]]]:
"""
Used when writing to disk in linked mode. The template for each nested deployment (in consideration) gets
separated and the nested deployment template reference is updated to templateLink using either relativePath
or uri when linked_base_uri is provided.
"""
content = self.content
result = []
resources: Dict[str, Dict[str, dict]] = content.get("resources", {})
if linked_base_uri and root_dir:
sep = "" if linked_base_uri.endswith("/") else "/"
root_dir = f"{linked_base_uri}{sep}{root_dir}"
for key in resources:
if resources[key].get("type", "").lower() != "microsoft.resources/deployments":
continue
nested_resources: List[dict] = resources[key]["properties"]["template"].get("resources", [])
if not nested_resources:
continue
nested_type = nested_resources[0].get("type", "").lower()
if nested_type not in self.linked_type_map:
continue
self.linked_type_map[nested_type] += 1
kind = nested_type.split("/")[-1]
linked_name = f"{kind}_{self.linked_type_map[nested_type]}"
linked_rel_path = f"{root_dir}/{linked_name}.json"
template_link = {"relativePath": linked_rel_path} if not linked_base_uri else {"uri": linked_rel_path}
resources[key]["properties"]["templateLink"] = template_link
result.append((linked_name, resources[key]["properties"]["template"]))
del resources[key]["properties"]["template"]
return content, result
def write(
self,
bundle_path: Optional[PurePath] = None,
template_mode: Optional[str] = None,
linked_base_uri: Optional[str] = None,
file_ext: str = "json",
):
if not bundle_path:
return
content = None
deployments = None
if template_mode == TemplateMode.LINKED.value:
content, deployments = self._get_deployments(bundle_path.name, linked_base_uri)
content = content or self.content
template_str = dumps(content, indent=2)
with open(file=f"{bundle_path}.{file_ext}", mode="w", encoding="utf8") as template_file:
template_file.write(template_str)
# This is where assets_1.json, assetendpointprofiles_1.json, etc will be written.
if deployments:
Path(bundle_path).mkdir(exist_ok=True)
for deployment in deployments:
with open(
file=f"{bundle_path.joinpath(deployment[0])}.{file_ext}", mode="w", encoding="utf8"
) as template_file:
template_file.write(dumps(deployment[1], indent=2))
class TemplateGen:
def __init__(
self, rcontainer_map: Dict[str, ResourceContainer], parameter_map: dict, variable_map: dict, metadata_map: dict
):
self.rcontainer_map = rcontainer_map
self.parameter_map = parameter_map
self.variable_map = variable_map
self.metadata_map = metadata_map
def _prune_template_keys(self, template: dict) -> dict:
result = {}
for key in template:
if not template[key]:
continue
result[key] = template[key]
return result
def get_content(self) -> "TemplateContent":
template = self.get_base_format()
for template_key in self.rcontainer_map:
template["resources"][template_key] = self.rcontainer_map[template_key].get()
template["parameters"].update(self.parameter_map)
template["variables"].update(self.variable_map)
template["metadata"].update(self.metadata_map)
template = self._prune_template_keys(template)
return TemplateContent(content=template)
def get_base_format(self) -> dict:
return {
"$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#",
"languageVersion": "2.0",
"contentVersion": "1.0.0.0",
"metadata": {},
"apiProfile": "",
"definitions": {},
"parameters": {},
"variables": {},
"functions": [],
"resources": {},
"outputs": {},
}
def process_depends_on(
depends_on: Optional[Union[Iterable[str], str, Iterable[StateResourceKey], StateResourceKey]] = None
) -> Optional[Iterable[str]]:
if not depends_on:
return
result = []
if isinstance(depends_on, StateResourceKey):
depends_on = depends_on.value
if isinstance(depends_on, str):
result.append(depends_on)
return result
if isinstance(depends_on, Iterable):
for d in depends_on:
if isinstance(d, StateResourceKey):
d = d.value
if isinstance(d, str):
result.append(d)
return result
# TODO: Re-use?
def get_bundle_path(instance_name: str, bundle_dir: Optional[str] = None) -> Optional[PurePath]:
from ...util import normalize_dir
if not bundle_dir:
return
bundle_dir_pure_path = normalize_dir(bundle_dir)
bundle_pure_path = bundle_dir_pure_path.joinpath(default_bundle_name(instance_name))
return bundle_pure_path
def default_bundle_name(instance_name: str) -> str:
name = f"clone_{to_safe_filename(instance_name)}_aio"
return name
def build_parameter(
name: str,
type: str = "string",
metadata: Optional[dict] = None,
value: Optional[Any] = None,
default: Optional[Any] = None,
) -> dict:
result = {
name: {
"type": type,
}
}
if metadata:
result[name]["metadata"] = metadata
if value:
result[name]["value"] = value
if default:
result[name]["defaultValue"] = default
return result
def get_role_assignment():
return {
"type": "Microsoft.Authorization/roleAssignments",
"name": (
f"[guid(parameters('{TemplateParams.INSTANCE_NAME.value}'), "
f"parameters('{TemplateParams.CLUSTER_NAME.value}'), parameters('principalId'), resourceGroup().id)]"
),
"scope": f"[parameters('{TemplateParams.SCHEMA_REGISTRY_ID.value}')]",
"properties": {
"roleDefinitionId": (
f"[subscriptionResourceId('Microsoft.Authorization/roleDefinitions', '{CONTRIBUTOR_ROLE_ID}')]"
),
"principalId": "[parameters('principalId')]",
"principalType": "ServicePrincipal",
},
}
# TODO: Work out goals, placement and version library
class VersionGuru:
def __init__(self, instance: dict):
self.instance = instance
self.version: str = self.instance["properties"].get("version")
if not self.version:
raise ValidationError("Unable to determine version of the instance.")
self.parsed_version = parse_version(self.version)
def ensure_compat(self, force: Optional[bool] = None):
if force:
return
if self.parsed_version >= parse_version(CLONE_INSTANCE_VERS_MIN) and self.parsed_version < parse_version(
CLONE_INSTANCE_VERS_MAX
):
return
raise ValidationError(
f"This clone client is not compatible with the target instance version {self.version}.\n"
f"The instance must be >={CLONE_INSTANCE_VERS_MIN},<{CLONE_INSTANCE_VERS_MAX}.\n"
"While not recommended, you can use --force flag to continue anyway."
)
def get_instance_api(self) -> str:
if self.parsed_version < parse_version("1.1.0"):
return "2024-11-01"
return "2025-04-01"