azext_iot/iothub/providers/state.py (807 lines of code) (raw):

# coding=utf-8 # -------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- import json import os from typing import Dict, List, Optional from azure.cli.core.azclierror import (AzCLIError, BadRequestError, FileOperationError, MutuallyExclusiveArgumentError, RequiredArgumentMissingError, ResourceNotFoundError) from knack.log import get_logger from knack.prompting import prompt_y_n from tqdm import tqdm import azext_iot.iothub.providers.helpers.state_strings as usr_msgs from azext_iot._factory import iot_hub_service_factory from azext_iot.common._azure import ( parse_cosmos_db_connection_string, parse_iot_hub_message_endpoint_connection_string, parse_storage_container_connection_string) from azext_iot.common.embedded_cli import EmbeddedCLI from azext_iot.common.shared import (ConfigType, DeviceAuthApiType, DeviceAuthType) from azext_iot.iothub.common import ( IMMUTABLE_AND_DUPLICATE_MODULE_TWIN_FIELDS, IMMUTABLE_DEVICE_IDENTITY_FIELDS, IMMUTABLE_MODULE_IDENTITY_FIELDS, AuthenticationType, HubAspects) from azext_iot.iothub.providers.base import IoTHubProvider from azext_iot.operations.hub import (_iot_device_create, _iot_device_delete, _iot_device_module_create, _iot_device_module_list, _iot_device_module_show, _iot_device_module_twin_show, _iot_device_module_twin_update, _iot_device_set_parent, _iot_device_show, _iot_device_twin_list, _iot_device_twin_update, _iot_edge_set_modules, _iot_hub_configuration_create, _iot_hub_configuration_delete, _iot_hub_configuration_list) logger = get_logger(__name__) cli = EmbeddedCLI() class StateProvider(IoTHubProvider): def __init__( self, cmd, hub: Optional[str] = None, rg: Optional[str] = None, login: Optional[str] = None, auth_type_dataplane: Optional[str] = None, export: bool = False, ): try: super(StateProvider, self).__init__( cmd=cmd, hub_name=hub, rg=rg, login=login, auth_type_dataplane=auth_type_dataplane ) except ResourceNotFoundError as e: if export: raise e self.target = None self.resolver = None self.auth_type = auth_type_dataplane self.login = login if self.target: self.hub_name = self.target["name"] if not self.rg and self.target: self.rg = self.target.get("resourcegroup") def _get_client(self): return iot_hub_service_factory(self.cmd.cli_ctx) def save_state(self, state_file: str, replace: bool = False, hub_aspects: Optional[List[str]] = None): """Main command to writes hub state to file""" if ( os.path.exists(state_file) and os.stat(state_file).st_size and not replace and not prompt_y_n(msg=usr_msgs.OVERWRITE_FILE_MSG.format(state_file), default="n") ): raise FileOperationError(usr_msgs.FILE_NOT_EMPTY_ERROR) if not hub_aspects: hub_aspects = HubAspects.list() if HubAspects.Arm.value in hub_aspects and self.login: raise MutuallyExclusiveArgumentError(usr_msgs.LOGIN_WITH_ARM_ERROR) hub_state = self.process_hub_to_dict(self.target, hub_aspects) try: with open(state_file, 'w', encoding='utf-8') as f: json.dump(hub_state, f, indent=4, sort_keys=True) logger.info(usr_msgs.SAVE_STATE_MSG.format(self.hub_name, state_file)) except FileNotFoundError: raise FileOperationError(usr_msgs.FILE_NOT_FOUND_ERROR.format(state_file)) def upload_state(self, state_file: str, replace: bool = False, hub_aspects: Optional[List[str]] = None): """Main command that uses hub state from file to recreate the hub state""" if not hub_aspects: hub_aspects = HubAspects.list() if HubAspects.Arm.value in hub_aspects and self.login: raise MutuallyExclusiveArgumentError(usr_msgs.LOGIN_WITH_ARM_ERROR) if not self.rg and not self.target: raise RequiredArgumentMissingError(usr_msgs.MISSING_RG_ON_CREATE_ERROR) if HubAspects.Arm.value not in hub_aspects and not self.target: raise ResourceNotFoundError(usr_msgs.TARGET_HUB_NOT_FOUND_MSG.format(self.hub_name)) self.delete_aspects(replace, hub_aspects) try: with open(state_file, 'r', encoding='utf-8') as f: hub_state = json.load(f) self.upload_hub_from_dict(hub_state, hub_aspects) logger.info(usr_msgs.UPLOAD_STATE_MSG.format(state_file, self.hub_name)) except FileNotFoundError: raise FileOperationError(usr_msgs.FILE_NOT_FOUND_ERROR.format(state_file)) def migrate_state( self, orig_hub: Optional[str] = None, orig_rg: Optional[str] = None, orig_hub_login: Optional[str] = None, replace: bool = False, hub_aspects: Optional[List[str]] = None ): """Migrates state from original hub to destination hub.""" orig_hub_target = self.discovery.get_target( resource_name=orig_hub, resource_group_name=orig_rg, login=orig_hub_login, auth_type=self.auth_type ) if not hub_aspects: hub_aspects = HubAspects.list() if HubAspects.Arm.value in hub_aspects and self.login: raise MutuallyExclusiveArgumentError(usr_msgs.LOGIN_WITH_ARM_ERROR) if HubAspects.Arm.value not in hub_aspects and not self.target: raise ResourceNotFoundError(usr_msgs.TARGET_HUB_NOT_FOUND_MSG.format(self.hub_name)) if not self.rg and not self.target: self.rg = orig_hub_target.get("resourcegroup") # Command modifies hub_aspect - make copy so we can reuse for upload hub_state = self.process_hub_to_dict(orig_hub_target, hub_aspects[:]) self.delete_aspects(replace, hub_aspects) self.upload_hub_from_dict(hub_state, hub_aspects) logger.info(usr_msgs.MIGRATE_STATE_MSG.format(orig_hub, self.hub_name)) def delete_aspects(self, replace, hub_aspects: List[str]): """ Delete all necessary hub aspects if the hub exists. Delete arm (certificates) and dataplane aspects only present in hub_aspects. """ if self.target and replace: if HubAspects.Configurations.value in hub_aspects: self.delete_all_configs() if HubAspects.Devices.value in hub_aspects: self.delete_all_devices() if HubAspects.Arm.value in hub_aspects: self.delete_all_certificates() def process_hub_to_dict(self, target: Dict[str, str], hub_aspects: List[str]) -> dict: """Returns a dictionary containing the hub state Full structure: { "arm": full_arm_template, "configurations": { "admConfigurations": { "config_id": { config_properties } }, "edgeDeployments": { "config_id": { config_properties } } } "devices": { "device_id": { "identity": { identity_properties (and properties shared with twin) }, "twin" : { twin_properties }, "parent" : parent_id, "modules" : { "module_id" : { "identity": { identity_properties }, "twin": { twin_properties } } } } } } """ hub_state = {} if HubAspects.Configurations.value in hub_aspects: hub_aspects.remove(HubAspects.Configurations.value) # Basic tier does not support list config try: all_configs = _iot_hub_configuration_list(target=target) hub_state["configurations"] = {} adm_configs = {} for c in tqdm(all_configs, desc=usr_msgs.SAVE_CONFIGURATIONS_DESC, ascii=" #"): if c["content"].get("deviceContent") or c["content"].get("moduleContent"): for key in ["createdTimeUtc", "etag", "lastUpdatedTimeUtc", "schemaVersion"]: c.pop(key, None) adm_configs[c["id"]] = c hub_state["configurations"]["admConfigurations"] = adm_configs hub_state["configurations"]["edgeDeployments"] = { c["id"]: c for c in all_configs if c["content"].get("modulesContent") } except AzCLIError: logger.warning(usr_msgs.SAVE_CONFIGURATIONS_RETRIEVE_FAIL_MSG) if HubAspects.Devices.value in hub_aspects: hub_aspects.remove(HubAspects.Devices.value) devices = self.download_devices(target=target) if devices: hub_state["devices"] = devices # Controlplane using ARM if HubAspects.Arm.value in hub_aspects: hub_name = target.get("entity").split(".")[0] hub_rg = target.get("resourcegroup") control_plane_obj = self.discovery.find_resource(hub_name, hub_rg) if not hub_rg: hub_rg = control_plane_obj.additional_properties["resourcegroup"] hub_resource_id = control_plane_obj.id hub_arm = cli.invoke(f"group export -n {hub_rg} --resource-ids '{hub_resource_id}' --skip-all-params").as_json() hub_state["arm"] = hub_arm hub_resource = hub_state["arm"]["resources"][0] self.check_controlplane(hub_resource=hub_resource) print(usr_msgs.SAVE_ARM_DESC) return hub_state def upload_hub_from_dict(self, hub_state: dict, hub_aspects: List[str]): # Control plane if HubAspects.Arm.value in hub_aspects and hub_state.get("arm"): hub_aspects.remove(HubAspects.Arm.value) hub_resources = [] hub_resource = hub_state["arm"]["resources"][0] hub_resource["name"] = self.hub_name if self.target: # remove/overwrite attributes that cannot be changed current_hub_resource = self.discovery.find_resource(self.hub_name, self.rg) if not self.rg: self.rg = current_hub_resource.additional_properties["resourcegroup"] # location hub_resource["location"] = current_hub_resource.location # sku hub_resource["sku"] = current_hub_resource.sku.serialize() # event hub partitions partition_count = current_hub_resource.properties.event_hub_endpoints["events"].partition_count hub_resource["properties"]["eventHubEndpoints"]["events"]["partitionCount"] = partition_count # enable data residency if ( hasattr(current_hub_resource.properties, "enable_data_residency") and "enableDataResidency" in hub_resource["properties"] ): hub_resource["properties"]["enableDataResidency"] = current_hub_resource.properties.enable_data_residency # features - hub takes care of this but we will do this just incase hub_resource["properties"]["features"] = current_hub_resource.properties.features # TODO check for other props and add them as they pop up else: # If there is a system assigned identity endpoint, the ARM deployment may stall rather than failing # outright. So warn and fail before deployment. identity_endpoints = [] for endpoint_list in hub_resource["properties"]["routing"]["endpoints"].values(): for endpoint in endpoint_list: if ( endpoint["authenticationType"] == AuthenticationType.IdentityBased.value and not endpoint.get("identity") ): identity_endpoints.append(endpoint["name"]) if len(identity_endpoints) > 0: raise BadRequestError( usr_msgs.FAILED_ARM_IDENTITY_ENDPOINT_MSG.format(self.hub_name, identity_endpoints) ) hub_resources.append(hub_resource) hub_certs = [res for res in hub_state["arm"]["resources"][1:] if res["type"].endswith("certificates")] if len(hub_certs) < len(hub_state["arm"]["resources"]) - 1: logger.warning(usr_msgs.PRIVATE_ENDPOINT_WARNING_MSG) for res in hub_certs: res["name"] = self.hub_name + "/" + res["name"].split("/")[1] depends_on = res["dependsOn"][0].split("'") depends_on[3] = self.hub_name res["dependsOn"][0] = "'".join(depends_on) hub_resources.extend(hub_certs) hub_state["arm"]["resources"] = hub_resources state_file = f"arm_deployment-{self.hub_name}.json" with open(state_file, "w", encoding='utf-8') as f: json.dump(hub_state["arm"], f) print(f"Starting Arm Deployment for IoT Hub {self.hub_name}.") arm_result = cli.invoke( f"deployment group create --template-file {state_file} -g {self.rg}" ) os.remove(state_file) if not arm_result.success(): raise BadRequestError(usr_msgs.FAILED_ARM_MSG.format(self.hub_name)) if not self.target: self.target = self.discovery.get_target( hub_resource["name"], resource_group_name=arm_result.as_json()["resourceGroup"] ) print(usr_msgs.CREATE_IOT_HUB_MSG.format(self.hub_name)) else: print(usr_msgs.UPDATED_IOT_HUB_MSG.format(self.hub_name)) # block if the arm aspect is specified, the state file does not have the arm aspect, and the # hub does not exist if not self.target: raise BadRequestError(usr_msgs.HUB_NOT_CREATED_MSG.format(self.hub_name)) # Data plane # upload configurations if HubAspects.Configurations.value in hub_aspects and hub_state.get("configurations"): hub_aspects.remove(HubAspects.Configurations.value) configs = hub_state["configurations"]["admConfigurations"] edge_deployments = hub_state["configurations"]["edgeDeployments"] config_progress = tqdm( total=len(configs) + len(edge_deployments), desc=usr_msgs.UPLOAD_CONFIGURATIONS_DESC, ascii=" #" ) for config_id, config_obj in configs.items(): try: _iot_hub_configuration_create( target=self.target, config_id=config_id, content=json.dumps(config_obj["content"]), target_condition=config_obj["targetCondition"], priority=config_obj["priority"], labels=json.dumps(config_obj["labels"]), metrics=json.dumps(config_obj["metrics"]) ) except AzCLIError as e: logger.error(usr_msgs.UPLOAD_ADM_CONFIG_ERROR_MSG.format(config_id, e)) config_progress.update(1) layered_configs = {} for config_id, config_obj in edge_deployments.items(): if "properties.desired" not in config_obj["content"]["modulesContent"]["$edgeAgent"]: config_type = ConfigType.layered layered_configs[config_id] = config_obj else: config_type = ConfigType.edge try: _iot_hub_configuration_create( target=self.target, config_id=config_id, content=json.dumps(config_obj["content"]), target_condition=config_obj["targetCondition"], priority=config_obj["priority"], labels=json.dumps(config_obj["labels"]), metrics=json.dumps(config_obj["metrics"]), config_type=config_type ) except AzCLIError as e: logger.error(usr_msgs.UPLOAD_EDGE_DEPLOYMENT_ERROR_MSG.format(config_id, e)) config_progress.update(1) # Do layered configs after edge configs. # TODO: create an algo to figure out order for config_id, config_obj in layered_configs.items(): try: _iot_hub_configuration_create( target=self.target, config_id=config_id, content=json.dumps(config_obj["content"]), target_condition=config_obj["targetCondition"], priority=config_obj["priority"], labels=json.dumps(config_obj["labels"]), metrics=json.dumps(config_obj["metrics"]), config_type=config_type ) except AzCLIError as e: logger.error(usr_msgs.UPLOAD_EDGE_DEPLOYMENT_ERROR_MSG.format(config_id, e)) config_progress.update(1) # Devices if HubAspects.Devices.value in hub_aspects and hub_state.get("devices"): hub_aspects.remove(HubAspects.Devices.value) child_to_parent = {} for device_id, device_obj in tqdm(hub_state["devices"].items(), desc=usr_msgs.UPLOAD_DEVICE_MSG, ascii=" #"): # upload device identity and twin try: self.upload_device_identity(device_id, device_obj["identity"]) except AzCLIError as e: logger.error(usr_msgs.UPLOAD_DEVICE_IDENTITY_MSG.format(device_id, e)) continue try: _iot_device_twin_update( target=self.target, device_id=device_id, parameters=device_obj["twin"] ) except AzCLIError as e: logger.error(usr_msgs.UPLOAD_DEVICE_TWIN_MSG.format(device_id, e)) continue edge_modules = {} for module_id, module_obj in device_obj.get("modules", {}).items(): # upload will fail for modules that start with $ or have no auth if module_id.startswith("$") or module_obj["identity"]["authentication"]["type"] == "none": edge_modules[module_id] = { "properties.desired": module_obj["twin"]["properties"]["desired"] } else: module_identity = module_obj["identity"] module_twin = module_obj["twin"] try: self.upload_module_identity(device_id, module_id, module_identity) except AzCLIError as e: logger.error(usr_msgs.UPLOAD_DEVICE_MODULE_IDENTITY_MSG.format(module_id, device_id, e)) continue try: _iot_device_module_twin_update( target=self.target, device_id=device_id, module_id=module_id, parameters=module_twin ) except AzCLIError as e: logger.error(usr_msgs.UPLOAD_DEVICE_TWIN_MSG.format(module_id, device_id, e)) continue if edge_modules: try: _iot_edge_set_modules( target=self.target, device_id=device_id, content=json.dumps({"modulesContent": edge_modules}) ) except AzCLIError as e: logger.error(usr_msgs.UPLOAD_EDGE_MODULE_MSG.format(device_id, e)) continue if device_obj.get("parent"): child_to_parent[device_id] = device_obj["parent"] # set parent-child relationships after all devices are created for device_id in child_to_parent: try: _iot_device_set_parent(target=self.target, parent_id=child_to_parent[device_id], device_id=device_id) except AzCLIError as e: logger.error(usr_msgs.UPLOAD_DEVICE_RELATIONSHIP_MSG.format(child_to_parent[device_id], device_id, e)) continue # Leftover aspects if hub_aspects: logger.warning(usr_msgs.MISSING_HUB_ASPECTS_MSG.format(', '.join(hub_aspects))) # Download commands def download_devices(self, target: Dict[str, str]) -> dict: """ Fetch devices and convert to the following structure: { "device_id": { "identity": { identity_properties (and properties shared with twin) }, "twin" : { twin_properties }, "parent" : parent_id, "modules" : { "module_id" : { "identity": { identity_properties }, "twin": { twin_properties } } } } } """ # if incorrect permissions, will fail to retrieve any devices devices = {} try: twins = _iot_device_twin_list(target=target, top=None) except AzCLIError: logger.warning(usr_msgs.SAVE_DEVICES_RETRIEVE_FAIL_MSG) return for i in tqdm(range(len(twins)), desc=usr_msgs.SAVE_DEVICE_DESC, ascii=" #"): device_twin = twins[i] device_id = device_twin["deviceId"] device_obj = {} if device_twin.get("parentScopes"): device_parent = device_twin["parentScopes"][0].split("://")[1] device_obj["parent"] = device_parent[:device_parent.rfind("-")] # Basic tier does not support device twins, modules if not device_twin.get("properties"): continue # put properties + tags into the saved twin device_twin["properties"].pop("reported") for key in ["$metadata", "$version"]: device_twin["properties"]["desired"].pop(key) device_obj["twin"] = { "properties": device_twin.pop("properties") } if device_twin.get("tags"): device_obj["twin"]["tags"] = device_twin.pop("tags") # create the device identity from the device twin # primary and secondary keys show up in the "show" output but not in the "list" output authentication = { "type": device_twin.pop("authenticationType"), "x509Thumbprint": device_twin.pop("x509Thumbprint") } if authentication["type"] == DeviceAuthApiType.sas.value: # Cannot retrieve the sas key for some reason - throw out the device try: id2 = _iot_device_show(target=target, device_id=device_id) authentication["symmetricKey"] = id2["authentication"]["symmetricKey"] except AzCLIError: logger.warning(usr_msgs.SAVE_SPECIFIC_DEVICE_RETRIEVE_FAIL_MSG.format(device_id)) continue device_twin["authentication"] = authentication for key in IMMUTABLE_DEVICE_IDENTITY_FIELDS: device_twin.pop(key, None) device_obj["identity"] = device_twin # if unable to retrieve modules, log and continue without modules module_objs = {} try: module_objs = _iot_device_module_list(target=target, device_id=device_id) except AzCLIError: logger.warning(usr_msgs.SAVE_SPECIFIC_DEVICE_MODULES_RETRIEVE_FAIL_MSG.format(device_id)) if module_objs: device_obj["modules"] = {} for module in module_objs: module = module.serialize() module_id = module["moduleId"] # Fail to retrieve module identity - log and continue without module try: module_identity_show = _iot_device_module_show( target=target, device_id=device_id, module_id=module_id ) except AzCLIError: logger.warning( usr_msgs.SAVE_SPECIFIC_DEVICE_SPECIFIC_MODULE_RETRIEVE_FAIL_MSG.format("identity", module_id, device_id) ) continue module["authentication"] = module_identity_show["authentication"] for key in IMMUTABLE_MODULE_IDENTITY_FIELDS: module.pop(key) # Fail to retrieve module twin - log and continue without module try: module_twin = _iot_device_module_twin_show( target=target, device_id=device_id, module_id=module_id ) except AzCLIError: logger.warning( usr_msgs.SAVE_SPECIFIC_DEVICE_SPECIFIC_MODULE_RETRIEVE_FAIL_MSG.format("twin", module_id, device_id) ) continue for key in IMMUTABLE_AND_DUPLICATE_MODULE_TWIN_FIELDS: module_twin.pop(key) for key in ["$metadata", "$version"]: module_twin["properties"]["desired"].pop(key) module_twin["properties"].pop("reported") device_obj["modules"][module_id] = { "identity": module, "twin": module_twin } devices[device_id] = device_obj return devices def check_controlplane(self, hub_resource: dict): """ Check the controlplane for missing resources and connection strings. Specifically will fetch connection strings for endpoints and file upload and check for existance of endpoint resources, file upload resources, and user assigned identity resources. If cannot retrieve (due to missing permissions or resource), log and remove endpoint, file upload, or user assigned identity. If an endpoint is removed, any route associated with it needs to be removed too. """ # User Assigned Identites identities = hub_resource["identity"].get("userAssignedIdentities") removed_identities = [] if identities: existing_identities = {} for identity in identities: success = cli.invoke(f"identity show --ids {identity}").success() if success: existing_identities[identity] = {} else: logger.warning( usr_msgs.SAVE_UAI_RETRIEVE_FAIL_MSG.format(identity) ) removed_identities.append(identity) hub_resource["identity"]["userAssignedIdentities"] = existing_identities # Endpoints - build up new list of endpoints we want to keep and track the removed ones endpoints = hub_resource["properties"]["routing"]["endpoints"] removed_endpoints = [] # Cosmos Db cosmos_endpoints = [] for ep in endpoints.get("cosmosDBSqlContainers", []): account_name = ep["endpointUri"].strip("https://").split(".")[0] if ep.get("primaryKey") or ep.get("secondaryKey"): try: cosmos_keys = cli.invoke( "cosmosdb keys list --name {} --resource-group {} --type connection-strings --subscription {}".format( account_name, ep.get("resourceGroup"), ep.get("subscriptionId") ) ).as_json() except AzCLIError: logger.warning( usr_msgs.SAVE_ENDPOINT_RETRIEVE_FAIL_MSG.format("Cosmos DB Sql Collection", ep["name"]) ) removed_endpoints.append(ep["name"]) continue for cs_object in cosmos_keys["connectionStrings"]: if cs_object["description"] == "Primary SQL Connection String" and ep.get("primaryKey"): ep["primaryKey"] = parse_cosmos_db_connection_string(cs_object["connectionString"])["AccountKey"] if cs_object["description"] == "Secondary SQL Connection String" and ep.get("secondaryKey"): ep["secondaryKey"] = parse_cosmos_db_connection_string(cs_object["connectionString"])["AccountKey"] elif isinstance(ep.get("identity"), dict) and ep["identity"].get("userAssignedIdentity") in removed_identities: logger.warning( usr_msgs.SAVE_ENDPOINT_UAI_RETRIEVE_FAIL_MSG.format( "Cosmos DB Sql Collection", ep["name"], ep["identity"].get("userAssignedIdentity") ) ) removed_endpoints.append(ep["name"]) else: success = cli.invoke( "cosmosdb sql container show --account-name {} --resource-group {} --database-name {} " "--name {} --subscription {}".format( account_name, ep.get("resourceGroup"), ep["databaseName"], ep["collectionName"], ep.get("subscriptionId") ) ).success() if not success: logger.warning( usr_msgs.SAVE_ENDPOINT_INFO_RETRIEVE_FAIL_MSG.format("Cosmos DB Sql Collection", ep["name"]) ) removed_endpoints.append(ep["name"]) continue cosmos_endpoints.append(ep) endpoints["cosmosDBSqlContainers"] = cosmos_endpoints # Event Hub eventhub_endpoints = [] for ep in endpoints["eventHubs"]: if ep.get("connectionString"): endpoint_props = parse_iot_hub_message_endpoint_connection_string(ep["connectionString"]) namespace = endpoint_props["Endpoint"].strip("sb://").split(".")[0] try: ep["connectionString"] = cli.invoke( "eventhubs eventhub authorization-rule keys list --namespace-name {} --resource-group {} " "--eventhub-name {} --name {} --subscription {}".format( namespace, ep.get("resourceGroup"), endpoint_props["EntityPath"], endpoint_props["SharedAccessKeyName"], ep.get("subscriptionId") ) ).as_json()["primaryConnectionString"] except AzCLIError: logger.warning(usr_msgs.SAVE_ENDPOINT_RETRIEVE_FAIL_MSG.format("Event Hub", ep["name"])) removed_endpoints.append(ep["name"]) continue elif isinstance(ep.get("identity"), dict) and ep["identity"].get("userAssignedIdentity") in removed_identities: logger.warning( usr_msgs.SAVE_ENDPOINT_UAI_RETRIEVE_FAIL_MSG.format( "Event Hub", ep["name"], ep["identity"].get("userAssignedIdentity") ) ) removed_endpoints.append(ep["name"]) else: namespace = ep["endpointUri"].strip("sb://").split(".")[0] success = cli.invoke( "eventhubs eventhub show --namespace-name {} --resource-group {} " "--name {} --subscription {}".format( namespace, ep.get("resourceGroup"), ep["entityPath"], ep.get("subscriptionId") ) ).success() if not success: logger.warning(usr_msgs.SAVE_ENDPOINT_INFO_RETRIEVE_FAIL_MSG.format("Event Hub", ep["name"])) removed_endpoints.append(ep["name"]) continue eventhub_endpoints.append(ep) endpoints["eventHubs"] = eventhub_endpoints # Service Bus Queue servicebus_queue_endpoints = [] for ep in endpoints["serviceBusQueues"]: if ep.get("connectionString"): endpoint_props = parse_iot_hub_message_endpoint_connection_string(ep["connectionString"]) namespace = endpoint_props["Endpoint"].strip("sb://").split(".")[0] try: ep["connectionString"] = cli.invoke( "servicebus queue authorization-rule keys list --namespace-name {} --resource-group {} " "--queue-name {} --name {} --subscription {}".format( namespace, ep.get("resourceGroup"), endpoint_props["EntityPath"], endpoint_props["SharedAccessKeyName"], ep.get("subscriptionId") ) ).as_json()["primaryConnectionString"] except AzCLIError: logger.warning(usr_msgs.SAVE_ENDPOINT_RETRIEVE_FAIL_MSG.format("Service Bus Queue", ep["name"])) removed_endpoints.append(ep["name"]) continue elif isinstance(ep.get("identity"), dict) and ep["identity"].get("userAssignedIdentity") in removed_identities: logger.warning( usr_msgs.SAVE_ENDPOINT_UAI_RETRIEVE_FAIL_MSG.format( "Service Bus Queue", ep["name"], ep["identity"].get("userAssignedIdentity") ) ) removed_endpoints.append(ep["name"]) else: namespace = ep["endpointUri"].strip("sb://").split(".")[0] success = cli.invoke( "servicebus queue show --namespace-name {} --resource-group {} " "--name {} --subscription {}".format( namespace, ep.get("resourceGroup"), ep["entityPath"], ep.get("subscriptionId") ) ).success() if not success: logger.warning(usr_msgs.SAVE_ENDPOINT_INFO_RETRIEVE_FAIL_MSG.format("Service Bus Queue", ep["name"])) removed_endpoints.append(ep["name"]) continue servicebus_queue_endpoints.append(ep) endpoints["serviceBusQueues"] = servicebus_queue_endpoints # Service Bus Topic servicebus_topic_endpoints = [] for ep in endpoints["serviceBusTopics"]: if ep.get("connectionString"): endpoint_props = parse_iot_hub_message_endpoint_connection_string(ep["connectionString"]) namespace = endpoint_props["Endpoint"].strip("sb://").split(".")[0] try: ep["connectionString"] = cli.invoke( "servicebus topic authorization-rule keys list --namespace-name {} --resource-group {} " "--topic-name {} --name {} --subscription {}".format( namespace, ep.get("resourceGroup"), endpoint_props["EntityPath"], endpoint_props["SharedAccessKeyName"], ep.get("subscriptionId") ) ).as_json()["primaryConnectionString"] except AzCLIError: logger.warning( usr_msgs.SAVE_ENDPOINT_RETRIEVE_FAIL_MSG.format("Service Bus Topic", ep["name"]) ) removed_endpoints.append(ep["name"]) continue elif isinstance(ep.get("identity"), dict) and ep["identity"].get("userAssignedIdentity") in removed_identities: logger.warning( usr_msgs.SAVE_ENDPOINT_UAI_RETRIEVE_FAIL_MSG.format( "Service Bus Topic", ep["name"], ep["identity"].get("userAssignedIdentity") ) ) removed_endpoints.append(ep["name"]) else: namespace = ep["endpointUri"].strip("sb://").split(".")[0] success = cli.invoke( "servicebus topic show --namespace-name {} --resource-group {} " "--name {} --subscription {}".format( namespace, ep.get("resourceGroup"), ep["entityPath"], ep.get("subscriptionId") ) ).success() if not success: logger.warning( usr_msgs.SAVE_ENDPOINT_INFO_RETRIEVE_FAIL_MSG.format("Service Bus Topic", ep["name"]) ) removed_endpoints.append(ep["name"]) continue servicebus_topic_endpoints.append(ep) endpoints["serviceBusTopics"] = servicebus_topic_endpoints # Storage Account storage_endpoints = [] for ep in endpoints["storageContainers"]: if ep.get("connectionString"): endpoint_props = parse_storage_container_connection_string(ep["connectionString"]) try: ep["connectionString"] = cli.invoke( "storage account show-connection-string -n {} -g {} --subscription {}".format( endpoint_props["AccountName"], ep.get("resourceGroup"), ep.get("subscriptionId") ) ).as_json()["connectionString"] except AzCLIError: logger.warning( usr_msgs.SAVE_ENDPOINT_RETRIEVE_FAIL_MSG.format("Storage Container", ep["name"]) ) removed_endpoints.append(ep["name"]) continue elif isinstance(ep.get("identity"), dict) and ep["identity"].get("userAssignedIdentity") in removed_identities: logger.warning( usr_msgs.SAVE_ENDPOINT_UAI_RETRIEVE_FAIL_MSG.format( "Storage Container", ep["name"], ep["identity"].get("userAssignedIdentity") ) ) removed_endpoints.append(ep["name"]) else: account_name = ep["endpointUri"].strip("https://").split(".")[0] success = cli.invoke( "storage account show --name {} --subscription {}".format( account_name, ep.get("subscriptionId"), ) ).success() if not success: logger.warning( usr_msgs.SAVE_ENDPOINT_INFO_RETRIEVE_FAIL_MSG.format("Storage Container", ep["name"]) ) removed_endpoints.append(ep["name"]) continue storage_endpoints.append(ep) endpoints["storageContainers"] = storage_endpoints # Go through routes to remove any that use removed endpoints routes = hub_resource["properties"]["routing"]["routes"] filtered_routes = [route for route in routes if route["endpointNames"][0] not in removed_endpoints] hub_resource["properties"]["routing"]["routes"] = filtered_routes filtered_routes = [] for route in routes: if route["endpointNames"][0] in removed_endpoints: logger.warning(usr_msgs.SAVE_ROUTE_FAIL_MSG.format(route["name"], route["endpointNames"][0])) else: filtered_routes.append(route) hub_resource["properties"]["routing"]["routes"] = filtered_routes # File upload file_upload = hub_resource["properties"]["storageEndpoints"].get("$default", {}) if ( isinstance(file_upload.get("identity"), dict) and file_upload["identity"].get("userAssignedIdentity") in removed_identities ): logger.warning( usr_msgs.SAVE_FILE_UPLOAD_UAI_RETRIEVE_FAIL_MSG.format(file_upload["identity"].get("userAssignedIdentity")) ) file_upload["authenticationType"] = None file_upload["connectionString"] = None file_upload["containerName"] = None file_upload["identity"] = None elif file_upload.get("connectionString"): endpoint_props = parse_storage_container_connection_string(file_upload["connectionString"]) try: file_upload["connectionString"] = cli.invoke( "storage account show-connection-string -n {}".format( endpoint_props["AccountName"] ) ).as_json()["connectionString"] except AzCLIError: logger.warning(usr_msgs.SAVE_FILE_UPLOAD_RETRIEVE_FAIL_MSG) file_upload["authenticationType"] = None file_upload["connectionString"] = None file_upload["containerName"] = None # Upload commands def upload_device_identity(self, device_id: str, identity: dict): auth_type = identity["authentication"]["type"] edge = identity["capabilities"]["iotEdge"] status = identity["status"] ptp = identity["authentication"]["x509Thumbprint"]["primaryThumbprint"] stp = identity["authentication"]["x509Thumbprint"]["secondaryThumbprint"] if "status_reason" in identity.keys(): status_reason = identity["statusReason"] else: status_reason = None if auth_type == DeviceAuthApiType.sas.value: pk = identity["authentication"]["symmetricKey"]["primaryKey"] sk = identity["authentication"]["symmetricKey"]["secondaryKey"] _iot_device_create( target=self.target, device_id=device_id, edge_enabled=edge, primary_key=pk, secondary_key=sk, status=status, status_reason=status_reason ) elif auth_type == DeviceAuthApiType.selfSigned.value: _iot_device_create( target=self.target, device_id=device_id, edge_enabled=edge, auth_method=DeviceAuthType.x509_thumbprint.value, primary_thumbprint=ptp, secondary_thumbprint=stp, status=status, status_reason=status_reason ) elif auth_type == DeviceAuthApiType.certificateAuthority.value: _iot_device_create( target=self.target, device_id=device_id, edge_enabled=edge, auth_method=DeviceAuthType.x509_ca.value, primary_thumbprint=ptp, secondary_thumbprint=stp, status=status, status_reason=status_reason ) else: logger.error(usr_msgs.BAD_DEVICE_AUTHORIZATION_MSG.format(device_id)) _iot_device_show(target=self.target, device_id=device_id) def upload_module_identity(self, device_id: str, module_id: str, identity: dict): auth_type = identity["authentication"]["type"] if auth_type == DeviceAuthApiType.sas.value: pk = identity["authentication"]["symmetricKey"]["primaryKey"] sk = identity["authentication"]["symmetricKey"]["secondaryKey"] _iot_device_module_create(target=self.target, device_id=device_id, module_id=module_id, primary_key=pk, secondary_key=sk) elif auth_type == DeviceAuthApiType.selfSigned.value: ptp = identity["authentication"]["x509Thumbprint"]["primaryThumbprint"] stp = identity["authentication"]["x509Thumbprint"]["secondaryThumbprint"] _iot_device_module_create(target=self.target, device_id=device_id, module_id=module_id, auth_method=DeviceAuthType.x509_thumbprint.value, primary_thumbprint=ptp, secondary_thumbprint=stp) elif auth_type == DeviceAuthApiType.certificateAuthority.value: _iot_device_module_create(target=self.target, device_id=device_id, module_id=module_id, auth_method=DeviceAuthType.x509_ca.value) else: logger.error(usr_msgs.BAD_DEVICE_MODULE_AUTHORIZATION_MSG.format(module_id, device_id)) # Delete Commands def delete_all_certificates(self): """Delete all certs if possible.""" cert_client = self._get_client().certificates # serialize strips name and etag - use as_dict instead certificates = cert_client.list_by_iot_hub(self.rg, self.hub_name).as_dict() for cert in tqdm(certificates["value"], desc=usr_msgs.DELETE_CERT_DESC, ascii=" #"): cert_client.delete(self.rg, self.hub_name, cert["name"], cert["etag"]) def delete_all_configs(self): """Delete all configs if possible.""" # Basic tier does not support list config try: all_configs = _iot_hub_configuration_list(target=self.target) except AzCLIError: logger.warning(usr_msgs.SKIP_CONFIGURATION_DELETE_MSG) return for config in tqdm(all_configs, desc=usr_msgs.DELETE_CONFIGURATION_DESC, ascii=" #"): try: _iot_hub_configuration_delete(target=self.target, config_id=config["id"]) except ResourceNotFoundError: logger.warning(usr_msgs.DELETE_CONFIGURATION_FAILURE_MSG.format(config["id"])) def delete_all_devices(self): """Delete all devices if possible.""" identities = _iot_device_twin_list(target=self.target, top=None) for d in tqdm(identities, desc=usr_msgs.DELETE_DEVICES_DESC, ascii=" #"): try: _iot_device_delete(target=self.target, device_id=d["deviceId"]) except ResourceNotFoundError: logger.warning(usr_msgs.DELETE_DEVICES_FAILURE_MSG.format(d["deviceId"]))