azext_edge/edge/providers/orchestration/resources/instances.py (499 lines of code) (raw):

# coding=utf-8 # ---------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License file in the project root for license information. # ---------------------------------------------------------------------------------------------- import re from typing import Dict, Iterable, List, Optional from azure.cli.core.azclierror import InvalidArgumentValueError, ValidationError from azure.core.exceptions import ResourceNotFoundError from knack.log import get_logger from rich import print from rich.console import Console from ....util.az_client import ( ResourceIdContainer, get_iotops_mgmt_client, get_msi_mgmt_client, get_ssc_mgmt_client, get_tenant_id, parse_resource_id, wait_for_terminal_state, ) from ....util.common import ( parse_kvp_nargs, should_continue_prompt, url_safe_hash_phrase, ) from ....util.queryable import Queryable from ..common import CUSTOM_LOCATIONS_API_VERSION, KEYVAULT_CLOUD_API_VERSION from ..permissions import ROLE_DEF_FORMAT_STR, PermissionManager from ..resource_map import IoTOperationsResourceMap logger = get_logger(__name__) console = Console() SPC_RESOURCE_TYPE = "microsoft.secretsynccontroller/azurekeyvaultsecretproviderclasses" SECRET_SYNC_RESOURCE_TYPE = "microsoft.secretsynccontroller/secretsyncs" SERVICE_ACCOUNT_DATAFLOW = "aio-dataflow" SERVICE_ACCOUNT_SECRETSYNC = "aio-ssc-sa" KEYVAULT_ROLE_ID_SECRETS_USER = "4633458b-17de-408a-b874-0445c86b69e6" KEYVAULT_ROLE_ID_READER = "21090545-7ca7-4776-b22c-e363652d74d2" COMPAT_FEAT_KEY_SET = {"connectors.settings.preview"} def get_user_msg_warn_ra(prefix: str, principal_id: str, scope: str) -> str: return ( f"{prefix}\n\n" f"The user-assigned managed identity principal '{principal_id}' needs\n" "'Key Vault Secrets User' and 'Key Vault Reader' or equivalent roles against scope:\n" f"'{scope}'\n\n" "Please handle this step before continuing." ) def get_spc_name(cluster_name: str, resource_group_name: str, instance_name: str) -> str: return "spc-ops-" + url_safe_hash_phrase(f"{cluster_name}-{resource_group_name}-{instance_name}")[:7] def get_fc_name(cluster_name: str, oidc_issuer: str, subject: str) -> str: return url_safe_hash_phrase(f"{cluster_name}-{oidc_issuer}-{subject}")[:7] def get_cred_subject(namespace: str, service_account_name: str): return f"system:serviceaccount:{namespace}:{service_account_name}" def get_enable_syntax(instanc_name: str, resource_group_name: str) -> str: return ( f"Use 'az iot ops secretsync enable -n {instanc_name} -g {resource_group_name} " "--user-assigned /ua/mi/resource/id'." ) class Instances(Queryable): def __init__(self, cmd, subscription_id: Optional[str] = None): # TODO: make sure this works correctly # TODO: longer term pattern? super().__init__(cmd=cmd, subscriptions=[subscription_id] if subscription_id else None) self.iotops_mgmt_client = get_iotops_mgmt_client( subscription_id=self.subscriptions[0], ) self.msi_mgmt_client = get_msi_mgmt_client( subscription_id=self.default_subscription_id, ) self.ssc_mgmt_client = get_ssc_mgmt_client( subscription_id=self.default_subscription_id, ) self.permission_manager = PermissionManager(self.default_subscription_id) def show(self, name: str, resource_group_name: str, show_tree: Optional[bool] = None) -> Optional[dict]: result = self.iotops_mgmt_client.instance.get(instance_name=name, resource_group_name=resource_group_name) if show_tree: self._show_tree(result) return return result def get_ext_loc( self, name: str, resource_group_name: str, ) -> Dict[str, str]: return self.show(name=name, resource_group_name=resource_group_name)["extendedLocation"] def list(self, resource_group_name: Optional[str] = None) -> Iterable[dict]: if resource_group_name: return self.iotops_mgmt_client.instance.list_by_resource_group(resource_group_name=resource_group_name) return self.iotops_mgmt_client.instance.list_by_subscription() def _show_tree(self, instance: dict): resource_map = self.get_resource_map(instance) with console.status("Working..."): resource_map.refresh_resource_state() print(resource_map.build_tree(category_color="cyan")) def _get_associated_cl(self, instance: dict) -> dict: return self.resource_client.resources.get_by_id( resource_id=instance["extendedLocation"]["name"], api_version=CUSTOM_LOCATIONS_API_VERSION ) def get_resource_map(self, instance: dict) -> IoTOperationsResourceMap: custom_location = self._get_associated_cl(instance) resource_id_container = parse_resource_id(custom_location["properties"]["hostResourceId"]) return IoTOperationsResourceMap( cmd=self.cmd, cluster_name=resource_id_container.resource_name, resource_group_name=resource_id_container.resource_group_name, subscription_id=resource_id_container.subscription_id, defer_refresh=True, ) def update( self, name: str, resource_group_name: str, tags: Optional[Dict[str, str]] = None, description: Optional[str] = None, features: Optional[List[str]] = None, **kwargs: dict, ) -> dict: instance = kwargs.pop("instance", None) or self.show(name=name, resource_group_name=resource_group_name) if description: instance["properties"]["description"] = description if features: desired_features = parse_feature_kvp_nargs(features, strict=True) current_features: dict = instance["properties"].get("features", {}) current_features.update(desired_features) instance["properties"]["features"] = current_features if tags or tags == {}: instance["tags"] = tags with console.status("Working..."): poller = self.iotops_mgmt_client.instance.begin_create_or_update( instance_name=name, resource_group_name=resource_group_name, resource=instance, ) return wait_for_terminal_state(poller, **kwargs) def remove_mi_user_assigned( self, name: str, resource_group_name: str, mi_user_assigned: str, federated_credential_name: Optional[str] = None, **kwargs, ): mi_resource_id_container = parse_resource_id(mi_user_assigned) instance = self.show(name=name, resource_group_name=resource_group_name) # TODO - @digimaun # cluster_resource = self.get_resource_map(instance).connected_cluster.resource # custom_location = self._get_associated_cl(instance) # namespace = custom_location["properties"]["namespace"] # oidc_issuer = self._ensure_oidc_issuer(cluster_resource) # cred_subject = get_cred_subject(namespace=namespace, service_account_name=SERVICE_ACCOUNT_DATAFLOW) # if not federated_credential_name: # federated_credential_name = get_fc_name( # cluster_name=cluster_resource["name"], # oidc_issuer=oidc_issuer, # subject=cred_subject, # ) # TODO - @digimaun if federated_credential_name: self.unfederate_msi(mi_resource_id_container, federated_credential_name) identity: dict = instance.get("identity", {}) if not identity: raise ValidationError("No identities are associated with the instance.") if mi_user_assigned not in identity.get("userAssignedIdentities", {}): raise ValidationError( f"The identity '{mi_resource_id_container.resource_name}' is not associated with the instance." ) del identity["userAssignedIdentities"][mi_user_assigned] # Check if we deleted them all. if not identity["userAssignedIdentities"]: identity["type"] = "None" instance["identity"] = identity return self.update(name=name, resource_group_name=resource_group_name, instance=instance, **kwargs) def add_mi_user_assigned( self, name: str, resource_group_name: str, mi_user_assigned: str, federated_credential_name: Optional[str] = None, use_self_hosted_issuer: Optional[bool] = None, **kwargs, ): """ Responsible for federating and building the instance identity object. """ kwargs.pop("usage_type", None) # TODO - @digimaun, unused atm. mi_resource_id_container = parse_resource_id(mi_user_assigned) instance = self.show(name=name, resource_group_name=resource_group_name) cluster_resource = self.get_resource_map(instance).connected_cluster.resource oidc_issuer = self._ensure_oidc_issuer(cluster_resource, use_self_hosted_issuer) custom_location = self._get_associated_cl(instance) namespace = custom_location["properties"]["namespace"] cred_subject = get_cred_subject(namespace=namespace, service_account_name=SERVICE_ACCOUNT_DATAFLOW) if not federated_credential_name: federated_credential_name = get_fc_name( cluster_name=cluster_resource["name"], oidc_issuer=oidc_issuer, subject=cred_subject, ) self.federate_msi( mi_resource_id_container, oidc_issuer=oidc_issuer, subject=cred_subject, federated_credential_name=federated_credential_name, ) identity: dict = instance.get("identity", {}) if not identity or identity.get("type") == "None": identity["type"] = "UserAssigned" identity["userAssignedIdentities"] = {} identity["userAssignedIdentities"][mi_user_assigned] = {} instance["identity"] = identity return self.update(name=name, resource_group_name=resource_group_name, instance=instance, **kwargs) def enable_secretsync( self, name: str, resource_group_name: str, mi_user_assigned: str, keyvault_resource_id: str, federated_credential_name: Optional[str] = None, spc_name: Optional[str] = None, skip_role_assignments: bool = False, use_self_hosted_issuer: Optional[bool] = None, **kwargs, ): # TODO: add unit test mi_resource_id_container = parse_resource_id(mi_user_assigned) keyvault_resource_id_container = parse_resource_id(keyvault_resource_id) with console.status("Working...") as status: # TODO self.resource_client.resources.get_by_id( resource_id=keyvault_resource_id_container.resource_id, api_version=KEYVAULT_CLOUD_API_VERSION ) # TODO - @digimaun self.msi_mgmt_client._config.subscription_id = mi_resource_id_container.subscription_id mi_user_assigned: dict = self.msi_mgmt_client.user_assigned_identities.get( resource_group_name=mi_resource_id_container.resource_group_name, resource_name=mi_resource_id_container.resource_name, ) role_assignment_error = None if not skip_role_assignments: role_assignment_error = self._attempt_keyvault_role_assignments( keyvault_resource_id_container=keyvault_resource_id_container, mi_user_assigned=mi_user_assigned ) instance = self.show(name=name, resource_group_name=resource_group_name) resource_map = self.get_resource_map(instance) cluster_resource = resource_map.connected_cluster.resource custom_location = self._get_associated_cl(instance) namespace = custom_location["properties"]["namespace"] cred_subject = get_cred_subject(namespace=namespace, service_account_name=SERVICE_ACCOUNT_SECRETSYNC) oidc_issuer = self._ensure_oidc_issuer(cluster_resource, use_self_hosted_issuer) cl_resources = resource_map.connected_cluster.get_aio_resources(custom_location_id=custom_location["id"]) secretsync_spc = self.find_existing_resources(cl_resources=cl_resources, resource_type=SPC_RESOURCE_TYPE) if secretsync_spc: status.stop() logger.warning( f"Instance '{instance['name']}' is already enabled for secret sync.\n" f"Use 'az iot ops secretsync list -n {instance['name']} -g {resource_group_name}' for details." ) return if not federated_credential_name: federated_credential_name = get_fc_name( cluster_name=cluster_resource["name"], oidc_issuer=oidc_issuer, subject=cred_subject, ) self.federate_msi( mi_resource_id_container=mi_resource_id_container, oidc_issuer=oidc_issuer, subject=cred_subject, federated_credential_name=federated_credential_name, ) spc_poller = self.ssc_mgmt_client.azure_key_vault_secret_provider_classes.begin_create_or_update( resource_group_name=resource_group_name, azure_key_vault_secret_provider_class_name=spc_name or get_spc_name( cluster_name=cluster_resource["name"], resource_group_name=resource_group_name, instance_name=instance["name"], ), resource={ "location": cluster_resource["location"], "extendedLocation": instance["extendedLocation"], "properties": { "clientId": mi_user_assigned["properties"]["clientId"], "keyvaultName": keyvault_resource_id_container.resource_name, "tenantId": get_tenant_id(), }, }, ) result_spc = wait_for_terminal_state(spc_poller, **kwargs) status.stop() if role_assignment_error: logger.warning(role_assignment_error) return result_spc def list_secretsync(self, name: str, resource_group_name: str) -> Optional[dict]: # TODO: add unit test with console.status("Working..."): instance = self.show(name=name, resource_group_name=resource_group_name) resource_map = self.get_resource_map(instance) cl_resources = resource_map.connected_cluster.get_aio_resources( custom_location_id=instance["extendedLocation"]["name"] ) secretsync_spcs = self.find_existing_resources(cl_resources=cl_resources, resource_type=SPC_RESOURCE_TYPE) if secretsync_spcs: return secretsync_spcs logger.warning(f"No secret provider class detected.\n{get_enable_syntax(name, resource_group_name)}") def disable_secretsync( self, name: str, resource_group_name: str, confirm_yes: Optional[bool] = None, **kwargs, ): # TODO: add unit test should_bail = not should_continue_prompt(confirm_yes=confirm_yes) if should_bail: return with console.status("Working..."): instance = self.show(name=name, resource_group_name=resource_group_name) resource_map = self.get_resource_map(instance) cl_resources = resource_map.connected_cluster.get_aio_resources( custom_location_id=instance["extendedLocation"]["name"] ) secretsync_spcs = self.find_existing_resources(cl_resources=cl_resources, resource_type=SPC_RESOURCE_TYPE) secretsyncs = self.find_existing_resources( cl_resources=cl_resources, resource_type=SECRET_SYNC_RESOURCE_TYPE ) related_secretsyncs = [] if secretsync_spcs: for secretsync_spc in secretsync_spcs: spc_poller = self.ssc_mgmt_client.azure_key_vault_secret_provider_classes.begin_delete( resource_group_name=resource_group_name, azure_key_vault_secret_provider_class_name=secretsync_spc["name"], ) wait_for_terminal_state(spc_poller, **kwargs) # get associated secret sync names related_secretsyncs.extend( self._find_spc_related_secretsyncs( spc_name=secretsync_spc["name"], secretsync_resources=secretsyncs, ) ) # delete associated secret syncs if related_secretsyncs: for secretsync in related_secretsyncs: secretsync_poller = self.ssc_mgmt_client.secret_syncs.begin_delete( resource_group_name=resource_group_name, secret_sync_name=secretsync, ) wait_for_terminal_state(secretsync_poller, **kwargs) return logger.warning(f"No secret provider class detected.\n{get_enable_syntax(name, resource_group_name)}") def find_existing_resources( self, cl_resources: List[dict], resource_type: str, resource_name: Optional[str] = None, ) -> Optional[List[dict]]: resources = [] if not cl_resources: raise ResourceNotFoundError( "No custom location resources found associated with the IoT Operations deployment." ) for resource in cl_resources: resource_id_container = parse_resource_id(resource["id"]) cl_resource_name = resource_id_container.resource_name # Ensure both type and name (if specified) match the resource is_name_matched = resource_name is None or cl_resource_name == resource_name is_type_matched = resource["type"].lower() == resource_type if is_type_matched and is_name_matched: if resource_type == SPC_RESOURCE_TYPE: resources.append( self.ssc_mgmt_client.azure_key_vault_secret_provider_classes.get( resource_group_name=resource_id_container.resource_group_name, azure_key_vault_secret_provider_class_name=resource_id_container.resource_name, ) ) elif resource_type == SECRET_SYNC_RESOURCE_TYPE: resources.append( self.ssc_mgmt_client.secret_syncs.get( resource_group_name=resource_id_container.resource_group_name, secret_sync_name=resource_id_container.resource_name, ) ) return resources def _find_spc_related_secretsyncs(self, spc_name: str, secretsync_resources: List[dict]) -> List[str]: related_secretsyncs = [] for secretsync in secretsync_resources: if secretsync["properties"]["secretProviderClassName"] == spc_name: related_secretsyncs.append(secretsync["name"]) return related_secretsyncs def _attempt_keyvault_role_assignments( self, keyvault_resource_id_container: ResourceIdContainer, mi_user_assigned: dict ) -> Optional[str]: """ Returns error string if the role-assignment fails. """ target_role_ids = [KEYVAULT_ROLE_ID_SECRETS_USER, KEYVAULT_ROLE_ID_READER] try: for role_id in target_role_ids: self.permission_manager.apply_role_assignment( scope=keyvault_resource_id_container.resource_id, principal_id=mi_user_assigned["properties"]["principalId"], role_def_id=ROLE_DEF_FORMAT_STR.format( subscription_id=keyvault_resource_id_container.subscription_id, role_id=role_id, ), ) except Exception as e: return get_user_msg_warn_ra( prefix=f"Role assignment failed with:\n{str(e)}.", principal_id=mi_user_assigned["properties"]["principalId"], scope=keyvault_resource_id_container.resource_id, ) def _ensure_oidc_issuer(self, cluster_resource: dict, use_self_hosted_issuer: Optional[bool] = None) -> str: enabled_oidc = cluster_resource["properties"].get("oidcIssuerProfile", {}).get("enabled", False) enabled_wlif = ( cluster_resource["properties"].get("securityProfile", {}).get("workloadIdentity", {}).get("enabled", False) ) error = f"The connected cluster '{cluster_resource['name']}' is not enabled" fix_with = ( f"Please enable with 'az connectedk8s update -n {cluster_resource['name']} " f"-g {parse_resource_id(cluster_resource['id']).resource_group_name}" ) if not enabled_oidc: error += " as an oidc issuer" fix_with += " --enable-oidc-issuer" if not enabled_wlif: sep = "" if enabled_oidc else " or" error += f"{sep} for workload identity federation" fix_with += " --enable-workload-identity" error += ".\n" error += f"{fix_with}'." if any([not enabled_oidc, not enabled_wlif]): raise ValidationError(error) oidc_issuer_profile: dict = cluster_resource["properties"]["oidcIssuerProfile"] issuer_key = "selfHostedIssuerUrl" if use_self_hosted_issuer else "issuerUrl" issuer_url = oidc_issuer_profile.get(issuer_key) if not issuer_url: raise ValidationError(f"No {issuer_key} is available. Check cluster config.") return issuer_url def federate_msi( self, mi_resource_id_container: ResourceIdContainer, oidc_issuer: str, subject: str, federated_credential_name: str, ): if self._find_federated_cred( mi_resource_id_container=mi_resource_id_container, issuer_url=oidc_issuer, subject=subject ): logger.debug( f"This OIDC issuer '{oidc_issuer}'\n" f"and subject '{subject}' combo are already associated " f"with identity '{mi_resource_id_container.resource_name}'.\n" "No new federated credential will be created." ) return # TODO - @digimaun self.msi_mgmt_client._config.subscription_id = mi_resource_id_container.subscription_id self.msi_mgmt_client.federated_identity_credentials.create_or_update( resource_group_name=mi_resource_id_container.resource_group_name, resource_name=mi_resource_id_container.resource_name, federated_identity_credential_resource_name=federated_credential_name, parameters={ "properties": { "subject": subject, "audiences": ["api://AzureADTokenExchange"], "issuer": oidc_issuer, } }, ) def unfederate_msi( self, mi_resource_id_container: ResourceIdContainer, federated_credential_name: str, ): # TODO - @digimaun self.msi_mgmt_client._config.subscription_id = mi_resource_id_container.subscription_id self.msi_mgmt_client.federated_identity_credentials.delete( resource_group_name=mi_resource_id_container.resource_group_name, resource_name=mi_resource_id_container.resource_name, federated_identity_credential_resource_name=federated_credential_name, ) def _find_federated_cred( self, mi_resource_id_container: ResourceIdContainer, issuer_url: str, subject: str ) -> Optional[dict]: # TODO - @digimaun self.msi_mgmt_client._config.subscription_id = mi_resource_id_container.subscription_id cred_iteratable = self.msi_mgmt_client.federated_identity_credentials.list( resource_group_name=mi_resource_id_container.resource_group_name, resource_name=mi_resource_id_container.resource_name, ) for cred in cred_iteratable: cred_props: dict = cred["properties"] if cred_props.get("issuer") == issuer_url and cred_props.get("subject") == subject: return cred def ensure_feature_key_compat(features: Dict[str, str]): for feat in features: if feat not in COMPAT_FEAT_KEY_SET: raise InvalidArgumentValueError(f"Supported feature keys: {', '.join(COMPAT_FEAT_KEY_SET)}") def parse_feature_kvp_nargs(features: Optional[List[str]] = None, strict: bool = False) -> Optional[Dict[str, dict]]: features: Dict[str, str] = parse_kvp_nargs(features) if not features: return features if strict: ensure_feature_key_compat(features) features_payload = {} errors = [] mode_pattern = re.compile(r"^\w+\.mode$") setting_pattern = re.compile(r"^\w+\.settings\.[^.\s]+$") for key in features: if not (mode_pattern.match(key) or setting_pattern.match(key)): errors.append( f"{key} is invalid. Feature keys must be in the form " f"'{{component}}.mode' or '{{component}}.settings.{{setting}}'." ) continue split_key = key.split(".") split_key_len = len(split_key) nested_key = "settings" if split_key_len >= 3 else "mode" if split_key[0] not in features_payload: features_payload[split_key[0]] = {} if nested_key == "settings": if "settings" not in features_payload[split_key[0]]: features_payload[split_key[0]][nested_key] = {} if features[key] not in ["Enabled", "Disabled"]: errors.append(f"{key} has an invalid value. Known setting values are: 'Enabled' or 'Disabled'.") continue features_payload[split_key[0]][nested_key][split_key[2]] = features[key] if nested_key == "mode": if features[key] not in ["Stable", "Preview", "Disabled"]: errors.append(f"{key} has an invalid value. Known mode values are: 'Stable', 'Preview' or 'Disabled'.") continue features_payload[split_key[0]][nested_key] = features[key] if errors: raise InvalidArgumentValueError("\n".join(errors)) return features_payload