azext_iot/operations/hub.py (2,622 lines of code) (raw):

# coding=utf-8 # -------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- from os.path import exists from knack.log import get_logger from enum import Enum, EnumMeta from tqdm import tqdm from azure.cli.core.azclierror import ( ArgumentUsageError, CLIInternalError, ClientRequestError, FileOperationError, InvalidArgumentValueError, RequiredArgumentMissingError, ResourceNotFoundError, ValidationError, ) from azext_iot.constants import ( DEVICE_DEVICESCOPE_PREFIX, IOTHUB_RENEW_KEY_BATCH_SIZE, IOTHUB_THROTTLE_MAX_TRIES, IOTHUB_THROTTLE_SLEEP_SEC, THROTTLE_HTTP_STATUS_CODE, TRACING_PROPERTY, TRACING_ALLOWED_FOR_LOCATION, TRACING_ALLOWED_FOR_SKU, ) from azext_iot.common.sas_token_auth import SasTokenAuthentication from azext_iot.common.shared import ( DeviceAuthType, SdkType, ConfigType, KeyType, RenewKeyType, IoTHubStateType, DeviceAuthApiType, ConnectionStringParser, EntityStatusType, JobType ) from azext_iot.iothub.providers.discovery import IotHubDiscovery from azext_iot.common.utility import ( assemble_nargs_to_dict, handle_service_exception, read_file_content, init_monitoring, process_json_arg, generate_storage_account_sas_token, ) from azext_iot._factory import SdkResolver, CloudError from azext_iot.operations.generic import _execute_query from typing import Optional import pprint logger = get_logger(__name__) printer = pprint.PrettyPrinter(indent=2) # Query def iot_query( cmd, query_command, hub_name_or_hostname=None, top=None, resource_group_name=None, login=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) return _iot_query(target, query_command, top) def _iot_query(target, query_command, top=None): resolver = SdkResolver(target=target) service_sdk = resolver.get_sdk(SdkType.service_sdk) try: query_args = [query_command] query_method = service_sdk.query.get_twins return _execute_query(query_args, query_method, top) except CloudError as e: handle_service_exception(e) # Device def iot_device_show( cmd, device_id, hub_name_or_hostname=None, resource_group_name=None, login=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) return _iot_device_show(target, device_id) def _iot_device_show(target, device_id): resolver = SdkResolver(target=target) service_sdk = resolver.get_sdk(SdkType.service_sdk) try: device = service_sdk.devices.get_identity( id=device_id, raw=True ).response.json() device["hub"] = target.get("entity") return device except CloudError as e: handle_service_exception(e) def iot_device_create( cmd, device_id, hub_name_or_hostname=None, edge_enabled=False, auth_method=DeviceAuthType.shared_private_key.value, primary_key=None, secondary_key=None, primary_thumbprint=None, secondary_thumbprint=None, status=EntityStatusType.enabled.value, status_reason=None, valid_days=None, output_dir=None, device_scope=None, resource_group_name=None, login=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) return _iot_device_create( target=target, device_id=device_id, edge_enabled=edge_enabled, auth_method=auth_method, primary_key=primary_key, secondary_key=secondary_key, primary_thumbprint=primary_thumbprint, secondary_thumbprint=secondary_thumbprint, status=status, status_reason=status_reason, valid_days=valid_days, output_dir=output_dir, device_scope=device_scope ) def _iot_device_create( target, device_id, edge_enabled=False, auth_method=DeviceAuthType.shared_private_key.value, primary_key=None, secondary_key=None, primary_thumbprint=None, secondary_thumbprint=None, status=EntityStatusType.enabled.value, status_reason=None, valid_days=None, output_dir=None, device_scope=None, ): resolver = SdkResolver(target=target) service_sdk = resolver.get_sdk(SdkType.service_sdk) if any([valid_days, output_dir]): valid_days = 365 if not valid_days else int(valid_days) if output_dir and not exists(output_dir): raise FileOperationError( "certificate output directory of '{}' does not exist.".format( output_dir ) ) cert = _create_self_signed_cert(device_id, valid_days, output_dir) primary_thumbprint = cert["thumbprint"] try: device = _assemble_device( is_update=False, device_id=device_id, auth_method=auth_method, edge_enabled=edge_enabled, pk=primary_thumbprint if auth_method == DeviceAuthType.x509_thumbprint.value else primary_key, sk=secondary_thumbprint if auth_method == DeviceAuthType.x509_thumbprint.value else secondary_key, status=status, status_reason=status_reason, device_scope=device_scope, ) output = service_sdk.devices.create_or_update_identity( id=device_id, device=device ) except CloudError as e: handle_service_exception(e) except ValueError as ve: raise InvalidArgumentValueError(ve) return output def _assemble_device( is_update, device_id, auth_method, edge_enabled, pk=None, sk=None, status=EntityStatusType.enabled.value, status_reason=None, device_scope=None, ): from azext_iot.sdk.iothub.service.models import DeviceCapabilities, Device auth = _assemble_auth(auth_method, pk, sk) cap = DeviceCapabilities(iot_edge=edge_enabled) if is_update: device = Device( device_id=device_id, authentication=auth, capabilities=cap, status=status, status_reason=status_reason, device_scope=device_scope, ) return device if edge_enabled: parent_scopes = [] if device_scope: parent_scopes = [device_scope] device = Device( device_id=device_id, authentication=auth, capabilities=cap, status=status, status_reason=status_reason, parent_scopes=parent_scopes, ) return device else: device = Device( device_id=device_id, authentication=auth, capabilities=cap, status=status, status_reason=status_reason, device_scope=device_scope, ) return device def _assemble_auth(auth_method, pk, sk): from azext_iot.sdk.iothub.service.models import ( AuthenticationMechanism, SymmetricKey, X509Thumbprint, ) auth = None if auth_method in [ DeviceAuthType.shared_private_key.name, DeviceAuthApiType.sas.value, ]: if any([pk, sk]) and not all([pk, sk]): raise ValueError("When configuring symmetric key auth both primary and secondary keys are required.") auth = AuthenticationMechanism( symmetric_key=SymmetricKey(primary_key=pk, secondary_key=sk), type=DeviceAuthApiType.sas.value, ) elif auth_method in [ DeviceAuthType.x509_thumbprint.name, DeviceAuthApiType.selfSigned.value, ]: if not pk: raise ValueError("When configuring selfSigned auth the primary thumbprint is required.") auth = AuthenticationMechanism( x509_thumbprint=X509Thumbprint( primary_thumbprint=pk, secondary_thumbprint=sk ), type=DeviceAuthApiType.selfSigned.value, ) elif auth_method in [ DeviceAuthType.x509_ca.name, DeviceAuthApiType.certificateAuthority.value, ]: auth = AuthenticationMechanism( type=DeviceAuthApiType.certificateAuthority.value ) else: raise ValueError("Authorization method {} invalid.".format(auth_method)) return auth def _create_self_signed_cert(subject, valid_days, output_path=None): from azext_iot.common.certops import create_self_signed_certificate return create_self_signed_certificate(subject=subject, valid_days=valid_days, cert_output_dir=output_path) def update_iot_device_custom( instance, edge_enabled=None, status=None, status_reason=None, auth_method=None, primary_thumbprint=None, secondary_thumbprint=None, primary_key=None, secondary_key=None, ): if edge_enabled is not None: instance["capabilities"]["iotEdge"] = edge_enabled if status is not None: instance["status"] = status if status_reason is not None: instance["statusReason"] = status_reason auth_type = instance["authentication"]["type"] if auth_method is not None: if auth_method == DeviceAuthType.shared_private_key.name: auth = DeviceAuthApiType.sas.value if (primary_key and not secondary_key) or ( not primary_key and secondary_key ): raise RequiredArgumentMissingError("primary + secondary Key required with sas auth") instance["authentication"]["symmetricKey"]["primaryKey"] = primary_key instance["authentication"]["symmetricKey"]["secondaryKey"] = secondary_key elif auth_method == DeviceAuthType.x509_thumbprint.name: auth = DeviceAuthApiType.selfSigned.value if not any([primary_thumbprint, secondary_thumbprint]): raise RequiredArgumentMissingError( "primary or secondary Thumbprint required with selfSigned auth" ) if primary_thumbprint: instance["authentication"]["x509Thumbprint"][ "primaryThumbprint" ] = primary_thumbprint if secondary_thumbprint: instance["authentication"]["x509Thumbprint"][ "secondaryThumbprint" ] = secondary_thumbprint elif auth_method == DeviceAuthType.x509_ca.name: auth = DeviceAuthApiType.certificateAuthority.value else: raise ValueError("Authorization method {} invalid.".format(auth_method)) instance["authentication"]["type"] = auth # if no new auth_method is provided, validate secondary auth arguments and update accordingly elif auth_type == DeviceAuthApiType.sas.value: if any([primary_thumbprint, secondary_thumbprint]): raise ValueError( "Device authorization method {} does not support primary or secondary thumbprints.".format( DeviceAuthType.shared_private_key.name ) ) if primary_key: instance["authentication"]["symmetricKey"]["primaryKey"] = primary_key if secondary_key: instance["authentication"]["symmetricKey"]["secondaryKey"] = secondary_key elif auth_type == DeviceAuthApiType.selfSigned.value: if any([primary_key, secondary_key]): raise ValueError( "Device authorization method {} does not support primary or secondary keys.".format( DeviceAuthType.x509_thumbprint.name ) ) if primary_thumbprint: instance["authentication"]["x509Thumbprint"][ "primaryThumbprint" ] = primary_thumbprint if secondary_thumbprint: instance["authentication"]["x509Thumbprint"][ "secondaryThumbprint" ] = secondary_thumbprint return instance def iot_device_update( cmd, device_id, parameters, hub_name_or_hostname=None, resource_group_name=None, login=None, etag=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) auth, pk, sk = _parse_auth(parameters) updated_device = _assemble_device( True, parameters["deviceId"], auth, parameters["capabilities"]["iotEdge"], pk, sk, parameters["status"].lower(), parameters.get("statusReason"), parameters.get("deviceScope"), ) updated_device.etag = etag if etag else "*" return _iot_device_update(target, device_id, updated_device) def _iot_device_update(target, device_id, device): resolver = SdkResolver(target=target) service_sdk = resolver.get_sdk(SdkType.service_sdk) try: headers = {} headers["If-Match"] = '"{}"'.format(device.etag) return service_sdk.devices.create_or_update_identity( id=device_id, device=device, custom_headers=headers ) except CloudError as e: handle_service_exception(e) def iot_device_delete( cmd, device_id, hub_name_or_hostname=None, resource_group_name=None, login=None, etag=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) return _iot_device_delete(target, device_id, etag) def _iot_device_delete(target, device_id, etag=None): resolver = SdkResolver(target=target) service_sdk = resolver.get_sdk(SdkType.service_sdk) try: headers = {} headers["If-Match"] = '"{}"'.format(etag if etag else "*") service_sdk.devices.delete_identity(id=device_id, custom_headers=headers) return except CloudError as e: handle_service_exception(e) def _update_device_key(target, device, auth_method, pk, sk, etag=None): resolver = SdkResolver(target=target) service_sdk = resolver.get_sdk(SdkType.service_sdk) try: auth = _assemble_auth(auth_method, pk, sk) device["authentication"] = auth headers = {} headers["If-Match"] = '"{}"'.format(etag if etag else "*") return service_sdk.devices.create_or_update_identity( id=device["deviceId"], device=device, custom_headers=headers, ) except CloudError as e: handle_service_exception(e) def iot_device_key_regenerate( cmd, hub_name_or_hostname, device_ids, renew_key_type, include_modules=False, no_progress=False, resource_group_name=None, login=None, etag=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) if renew_key_type == RenewKeyType.swap.value: if len(device_ids) > 1 or device_ids[0] == "*": raise InvalidArgumentValueError( "Currently, bulk key swap is not supported." ) device = _iot_device_show(target, device_ids[0]) if device["authentication"]["type"] != DeviceAuthApiType.sas.value: raise ClientRequestError("Device authentication should be of type sas") pk = device["authentication"]["symmetricKey"]["primaryKey"] sk = device["authentication"]["symmetricKey"]["secondaryKey"] temp = pk pk = sk sk = temp return _update_device_key( target, device, device["authentication"]["type"], pk, sk, etag ) resolver = SdkResolver(target=target) service_sdk = resolver.get_sdk(SdkType.service_sdk) if renew_key_type in [RenewKeyType.primary.value, RenewKeyType.secondary.value]: renew_key_type += "Key" modules = [] devices = [] if device_ids[0] == "*": device_twins = _iot_device_twin_list(target=target, top=None) device_twins.extend(_iot_device_twin_list(target=target, edge_enabled=True, top=None)) for device in device_twins: if device["authenticationType"] == DeviceAuthApiType.sas.value: devices.append({"id": device["deviceId"]}) # non sas devices can have sas modules... if include_modules: modules.extend( _iot_key_regenerate_process_modules( target=target, device_id=device["deviceId"], module_ids="*" ) ) else: devices = [{"id": device_id} for device_id in device_ids] if modules: logger.info(f"Found {len(modules)} modules.") result = _iot_key_regenerate_batch( service_sdk=service_sdk, renew_key_type=renew_key_type, items=devices + modules, no_progress=no_progress ) # avoid breaking changes by having one device return the device identity if all({len(device_ids) == 1, device_ids[0] != "*", not include_modules}): return _iot_device_show(target, device_ids[0]) return result def iot_device_get_parent( cmd, device_id, hub_name_or_hostname=None, resource_group_name=None, login=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) child_device = _iot_device_show(target, device_id) _validate_child_device(child_device) parent_scope = child_device["parentScopes"][0] parent_device_id = parent_scope[ len(DEVICE_DEVICESCOPE_PREFIX) : parent_scope.rindex("-") ] return _iot_device_show(target, parent_device_id) def iot_device_set_parent( cmd, device_id, parent_id, force=False, hub_name_or_hostname=None, resource_group_name=None, login=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) _iot_device_set_parent(target, parent_id, device_id, force) def _iot_device_set_parent(target, parent_id, device_id, force=False): parent_device = _iot_device_show(target, parent_id) _validate_edge_device(parent_device) child_device = _iot_device_show(target, device_id) _validate_parent_child_relation(child_device, force) _update_device_parent( target, child_device, child_device["capabilities"]["iotEdge"], parent_device["deviceScope"], ) def iot_device_children_add( cmd, device_id, child_list, force=False, hub_name_or_hostname=None, resource_group_name=None, login=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) return _iot_device_children_add(target, device_id, child_list, force) def _iot_device_children_add( target, device_id, child_list, force=False ): devices = [] edge_device = _iot_device_show(target, device_id) _validate_edge_device(edge_device) converted_child_list = child_list for child_device_id in converted_child_list: child_device = _iot_device_show(target, child_device_id.strip()) _validate_parent_child_relation(child_device, force) devices.append(child_device) for device in devices: _update_device_parent( target, device, device["capabilities"]["iotEdge"], edge_device["deviceScope"], ) def iot_device_children_remove( cmd, device_id, child_list=None, remove_all=False, hub_name_or_hostname=None, resource_group_name=None, login=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) devices = [] if remove_all: result = _iot_device_children_list(target, device_id) if not result: raise ClientRequestError( 'No registered child devices found for "{}" edge device.'.format( device_id ) ) for child_device_id in [str(x["deviceId"]) for x in result]: child_device = _iot_device_show(target, child_device_id.strip()) devices.append(child_device) elif child_list: edge_device = _iot_device_show(target, device_id) _validate_edge_device(edge_device) converted_child_list = child_list for child_device_id in converted_child_list: child_device = _iot_device_show(target, child_device_id.strip()) _validate_child_device(child_device) if child_device["parentScopes"] == [edge_device["deviceScope"]]: devices.append(child_device) else: raise ClientRequestError( 'The entered child device "{}" isn\'t assigned as a child of edge device "{}"'.format( child_device_id.strip(), device_id ) ) else: raise RequiredArgumentMissingError( "Please specify child list or use --remove-all to remove all children." ) for device in devices: _update_device_parent(target, device, device["capabilities"]["iotEdge"]) def iot_device_children_list( cmd, device_id, hub_name_or_hostname=None, resource_group_name=None, login=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) result = _iot_device_children_list(target, device_id) return [device["deviceId"] for device in result] def _iot_device_children_list(target, device_id): device = _iot_device_show(target, device_id) _validate_edge_device(device) query = ( "select deviceId from devices where array_contains(parentScopes, '{}')".format( device["deviceScope"] ) ) # TODO: Inefficient return _iot_query(target, query) def _update_device_parent(target, device, is_edge, device_scope=None): resolver = SdkResolver(target=target) service_sdk = resolver.get_sdk(SdkType.service_sdk) try: if is_edge: parent_scopes = [] if device_scope: parent_scopes = [device_scope] device["parentScopes"] = parent_scopes else: if not device_scope: device_scope = "" device["deviceScope"] = device_scope etag = device.get("etag", None) if etag: headers = {} headers["If-Match"] = '"{}"'.format(etag) service_sdk.devices.create_or_update_identity( id=device["deviceId"], device=device, custom_headers=headers, ) return raise LookupError("device etag not found.") except CloudError as e: handle_service_exception(e) except LookupError as err: raise CLIInternalError(err) def _validate_edge_device(device): if not device["capabilities"]["iotEdge"]: raise ClientRequestError( 'The device "{}" should be an edge device.'.format(device["deviceId"]) ) def _validate_child_device(device): if "parentScopes" not in device: raise ClientRequestError( 'Device "{}" doesn\'t support parent device functionality.'.format( device["deviceId"] ) ) if not device["parentScopes"]: raise ClientRequestError( 'Device "{}" doesn\'t have any parent device.'.format(device["deviceId"]) ) def _validate_parent_child_relation(child_device, force): if "parentScopes" not in child_device or child_device["parentScopes"] == []: return else: if not force: raise ClientRequestError( "The entered device \"{}\" already has a parent device, please use '--force'" " to overwrite".format(child_device["deviceId"]) ) return # Module def iot_device_module_create( cmd, device_id, module_id, hub_name_or_hostname=None, auth_method=DeviceAuthType.shared_private_key.value, primary_key=None, secondary_key=None, primary_thumbprint=None, secondary_thumbprint=None, valid_days=None, output_dir=None, resource_group_name=None, login=None, auth_type_dataplane=None, ): if any([valid_days, output_dir]): valid_days = 365 if not valid_days else int(valid_days) if output_dir and not exists(output_dir): raise FileOperationError( "certificate output directory of '{}' does not exist.".format( output_dir ) ) cert = _create_self_signed_cert(module_id, valid_days, output_dir) primary_thumbprint = cert["thumbprint"] discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) return _iot_device_module_create( target=target, device_id=device_id, module_id=module_id, auth_method=auth_method, primary_key=primary_key, secondary_key=secondary_key, primary_thumbprint=primary_thumbprint, secondary_thumbprint=secondary_thumbprint ) def _iot_device_module_create( target, device_id, module_id, auth_method=DeviceAuthType.shared_private_key.value, primary_key=None, secondary_key=None, primary_thumbprint=None, secondary_thumbprint=None ): resolver = SdkResolver(target=target) service_sdk = resolver.get_sdk(SdkType.service_sdk) try: module = _assemble_module( device_id=device_id, module_id=module_id, auth_method=auth_method, pk=primary_thumbprint if auth_method == DeviceAuthType.x509_thumbprint.value else primary_key, sk=secondary_thumbprint if auth_method == DeviceAuthType.x509_thumbprint.value else secondary_key, ) return service_sdk.modules.create_or_update_identity( id=device_id, mid=module_id, module=module ) except CloudError as e: handle_service_exception(e) except ValueError as ve: raise InvalidArgumentValueError(ve) def _assemble_module(device_id, module_id, auth_method, pk=None, sk=None): from azext_iot.sdk.iothub.service.models import Module auth = _assemble_auth(auth_method, pk, sk) module = Module(module_id=module_id, device_id=device_id, authentication=auth) return module def iot_device_module_update( cmd, device_id, module_id, parameters, hub_name_or_hostname=None, resource_group_name=None, login=None, etag=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) resolver = SdkResolver(target=target) service_sdk = resolver.get_sdk(SdkType.service_sdk) try: updated_module = _handle_module_update_params(parameters) headers = {} headers["If-Match"] = '"{}"'.format(etag if etag else "*") return service_sdk.modules.create_or_update_identity( id=device_id, mid=module_id, module=updated_module, custom_headers=headers, ) except CloudError as e: handle_service_exception(e) def _handle_module_update_params(parameters): auth, pk, sk = _parse_auth(parameters) return _assemble_module( device_id=parameters["deviceId"], module_id=parameters["moduleId"], auth_method=auth, pk=pk, sk=sk, ) def _parse_auth(parameters): valid_auth = [ DeviceAuthApiType.sas.value, DeviceAuthApiType.selfSigned.value, DeviceAuthApiType.certificateAuthority.value, ] auth = parameters["authentication"].get("type") if auth not in valid_auth: raise InvalidArgumentValueError("authentication.type must be one of {}".format(valid_auth)) pk = sk = None if auth == DeviceAuthApiType.sas.value: pk = parameters["authentication"]["symmetricKey"]["primaryKey"] sk = parameters["authentication"]["symmetricKey"]["secondaryKey"] elif auth == DeviceAuthApiType.selfSigned.value: pk = parameters["authentication"]["x509Thumbprint"]["primaryThumbprint"] sk = parameters["authentication"]["x509Thumbprint"]["secondaryThumbprint"] if not any([pk, sk]): raise RequiredArgumentMissingError( "primary + secondary Thumbprint required with selfSigned auth" ) return auth, pk, sk def iot_device_module_key_regenerate( cmd, hub_name_or_hostname, device_id, module_ids, renew_key_type, no_progress=None, resource_group_name=None, login=None, etag=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) resolver = SdkResolver(target=target) service_sdk = resolver.get_sdk(SdkType.service_sdk) if renew_key_type == RenewKeyType.swap.value: if len(module_ids) > 1 or module_ids[0] == "*": raise InvalidArgumentValueError( "Currently, bulk key swap is not supported." ) module = _iot_device_module_show(target=target, device_id=device_id, module_id=module_ids[0]) if module["authentication"]["type"] != DeviceAuthApiType.sas.value: raise ClientRequestError("Module authentication should be of type sas") pk = module["authentication"]["symmetricKey"]["primaryKey"] sk = module["authentication"]["symmetricKey"]["secondaryKey"] temp = pk module["authentication"]["symmetricKey"]["primaryKey"] = sk module["authentication"]["symmetricKey"]["secondaryKey"] = temp try: return service_sdk.modules.create_or_update_identity( id=device_id, mid=module_ids[0], module=module, custom_headers={ "If-Match": '"{}"'.format(etag if etag else "*") }, ) except CloudError as e: handle_service_exception(e) if renew_key_type in [RenewKeyType.primary.value, RenewKeyType.secondary.value]: renew_key_type += "Key" modules = _iot_key_regenerate_process_modules( target, device_id, module_ids ) result = _iot_key_regenerate_batch( service_sdk=service_sdk, renew_key_type=renew_key_type, items=modules, device_id=device_id, no_progress=no_progress ) if len(module_ids) == 1 and module_ids[0] != "*": return _iot_device_module_show(target=target, device_id=device_id, module_id=module_ids[0]) return result def _iot_key_regenerate_process_modules( target, device_id, module_ids, ): if module_ids[0] == "*": modules = _iot_device_module_list(target=target, device_id=device_id, top=None) module_ids = [] for module in modules: # not going to question why the call the Module object - just making things # easier for unit tests if not isinstance(module, dict): module = module.serialize() if module["authentication"]["type"] == DeviceAuthApiType.sas.value: module_ids.append(module["moduleId"]) return [{"id": device_id, "moduleId": module_id} for module_id in module_ids] def _iot_key_regenerate_batch( service_sdk, renew_key_type, items, device_id=None, no_progress=False, ): from time import sleep overall_result = { "policyKey": renew_key_type, "errors": [], "rotatedKeys": [] } starting_msg = f"modules for device {device_id}" if device_id else "devices" logger.info(f"Found {len(items)} {starting_msg}.") if not items: return {} batches = [] while items: if len(items) > IOTHUB_RENEW_KEY_BATCH_SIZE: batches.append(items[:IOTHUB_RENEW_KEY_BATCH_SIZE]) items = items[IOTHUB_RENEW_KEY_BATCH_SIZE:] else: batches.append(items[:]) items = [] for batch in tqdm(batches, desc="Bulk key regeneration is in progress", ascii=' #', disable=no_progress): # call result = None tries = 0 while tries < IOTHUB_THROTTLE_MAX_TRIES: try: result = service_sdk.service.bulk_regenerate_device_key_method( policy_key=renew_key_type, devices=batch ) break except CloudError as e: tries += 1 if tries == IOTHUB_THROTTLE_MAX_TRIES or e.status_code != THROTTLE_HTTP_STATUS_CODE: if overall_result["rotatedKeys"]: logger.warning( f"Managed to renew the following keys:\n{overall_result['rotatedKeys']}" ) handle_service_exception(e) sleep(IOTHUB_THROTTLE_SLEEP_SEC) # combine result if result.errors: overall_result["errors"].extend(result.errors) if result.rotated_keys: overall_result["rotatedKeys"].extend(result.rotated_keys) return overall_result def iot_device_module_list( cmd, device_id, hub_name_or_hostname=None, top=1000, resource_group_name=None, login=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) return _iot_device_module_list(target, device_id, top) def _iot_device_module_list(target, device_id, top=1000): resolver = SdkResolver(target=target) service_sdk = resolver.get_sdk(SdkType.service_sdk) try: return service_sdk.modules.get_modules_on_device(device_id)[:top] except CloudError as e: handle_service_exception(e) def iot_device_module_show( cmd, device_id, module_id, hub_name_or_hostname=None, resource_group_name=None, login=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) return _iot_device_module_show(target, device_id, module_id) def _iot_device_module_show(target, device_id, module_id): resolver = SdkResolver(target=target) service_sdk = resolver.get_sdk(SdkType.service_sdk) try: module = service_sdk.modules.get_identity( id=device_id, mid=module_id, raw=True ).response.json() module["hub"] = target.get("entity") return module except CloudError as e: handle_service_exception(e) def iot_device_module_delete( cmd, device_id, module_id, hub_name_or_hostname=None, resource_group_name=None, login=None, etag=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) resolver = SdkResolver(target=target) service_sdk = resolver.get_sdk(SdkType.service_sdk) try: headers = {} headers["If-Match"] = '"{}"'.format(etag if etag else "*") service_sdk.modules.delete_identity( id=device_id, mid=module_id, custom_headers=headers ) return except CloudError as e: handle_service_exception(e) def iot_device_module_twin_show( cmd, device_id, module_id, hub_name_or_hostname=None, resource_group_name=None, login=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) return _iot_device_module_twin_show( target=target, device_id=device_id, module_id=module_id ) def _iot_device_module_twin_show(target, device_id, module_id): resolver = SdkResolver(target=target) service_sdk = resolver.get_sdk(SdkType.service_sdk) try: return service_sdk.modules.get_twin( id=device_id, mid=module_id, raw=True ).response.json() except CloudError as e: handle_service_exception(e) def iot_device_module_twin_update( cmd, device_id, module_id, parameters, hub_name_or_hostname=None, resource_group_name=None, login=None, etag=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) return _iot_device_module_twin_update(target, device_id, module_id, parameters, etag) def _iot_device_module_twin_update(target, device_id, module_id, parameters, etag=None): from azext_iot.common.utility import verify_transform resolver = SdkResolver(target=target) service_sdk = resolver.get_sdk(SdkType.service_sdk) try: headers = {} headers["If-Match"] = '"{}"'.format(etag if etag else "*") verify = {} if parameters.get("properties"): if parameters["properties"].get("desired"): verify = {"properties.desired": dict} if parameters.get("tags"): verify["tags"] = dict verify_transform(parameters, verify) return service_sdk.modules.update_twin( id=device_id, mid=module_id, device_twin_info=parameters, custom_headers=headers, ) except CloudError as e: handle_service_exception(e) except (AttributeError, TypeError) as err: raise CLIInternalError(err) def iot_device_module_twin_replace( cmd, device_id, module_id, target_json, hub_name_or_hostname=None, resource_group_name=None, login=None, etag=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) return _iot_device_module_twin_replace(target, device_id, module_id, target_json, etag) def _iot_device_module_twin_replace(target, device_id, module_id, target_json, etag=None): resolver = SdkResolver(target=target) service_sdk = resolver.get_sdk(SdkType.service_sdk) try: target_json = process_json_arg(target_json, argument_name="json") headers = {} headers["If-Match"] = '"{}"'.format(etag if etag else "*") return service_sdk.modules.replace_twin( id=device_id, mid=module_id, device_twin_info=target_json, custom_headers=headers, ) except CloudError as e: handle_service_exception(e) def iot_edge_set_modules( cmd, device_id, content, hub_name_or_hostname=None, resource_group_name=None, login=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) return _iot_edge_set_modules(target, device_id, content) def _iot_edge_set_modules(target, device_id, content): from azext_iot.sdk.iothub.service.models import ConfigurationContent resolver = SdkResolver(target=target) service_sdk = resolver.get_sdk(SdkType.service_sdk) try: content = process_json_arg(content, argument_name="content") processed_content = _process_config_content( content, config_type=ConfigType.edge ) content = ConfigurationContent(**processed_content) service_sdk.configuration.apply_on_edge_device(id=device_id, content=content) return _iot_device_module_list(target, device_id) except CloudError as e: handle_service_exception(e) def iot_edge_export_modules( cmd, device_id, hub_name_or_hostname=None, resource_group_name=None, login=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) module_twin_list = [] try: # Get all modules in the device module_list = iot_device_module_list(cmd, device_id, hub_name_or_hostname=hub_name_or_hostname, login=login) for module in module_list: # Get module twins using module ids module_twin = _iot_device_module_twin_show( target=target, device_id=device_id, module_id=module.module_id) module_twin_list.append(module_twin) # Turn module twins list into module twin configuration return _build_edge_modules_configuration(module_twin_list) except CloudError as e: handle_service_exception(e) def _build_edge_modules_configuration(module_twin_list): modulesContent = {} for module_twin in module_twin_list: moduleId = module_twin["moduleId"] desiredProperties = module_twin["properties"]["desired"] # Add desired properties from module twin except $metadata and $version if desiredProperties: desiredProperties.pop("$metadata") desiredProperties.pop("$version") modulesContent[moduleId] = {"properties.desired": desiredProperties} return {"content": {"modulesContent": modulesContent}} def iot_edge_deployment_create( cmd, config_id, content, custom_labels=None, custom_metric_queries=None, hub_name_or_hostname=None, target_condition="", priority=0, labels=None, metrics=None, layered=False, no_validation=False, resource_group_name=None, login=None, auth_type_dataplane=None, ): # Short-term fix for --no-validation config_type = ConfigType.layered if layered or no_validation else ConfigType.edge discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) return _iot_hub_configuration_create( target=target, config_id=config_id, content=content, custom_labels=custom_labels, custom_metric_queries=custom_metric_queries, target_condition=target_condition, priority=priority, labels=labels, metrics=metrics, config_type=config_type ) def iot_hub_configuration_create( cmd, config_id, content, custom_labels=None, custom_metric_queries=None, hub_name_or_hostname=None, target_condition="", priority=0, labels=None, metrics=None, resource_group_name=None, login=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) return _iot_hub_configuration_create( target=target, config_id=config_id, content=content, custom_labels=custom_labels, custom_metric_queries=custom_metric_queries, target_condition=target_condition, priority=priority, labels=labels, metrics=metrics, config_type=ConfigType.adm ) def _iot_hub_configuration_create( target, config_id, content, config_type=ConfigType.adm, custom_labels=None, custom_metric_queries=None, target_condition="", priority=0, labels=None, metrics=None ): from azext_iot.sdk.iothub.service.models import ( Configuration, ConfigurationContent, ConfigurationMetrics, ) resolver = SdkResolver(target=target) service_sdk = resolver.get_sdk(SdkType.service_sdk) logger.debug("ensuring lowercase configuration Id...") config_id = config_id.lower() metrics_key = "queries" content = process_json_arg(content, argument_name="content") processed_content = _process_config_content(content, config_type) if "module_content" in processed_content: required_target_prefix = "from devices.modules where" lower_target_condition = target_condition.lower() if not lower_target_condition.startswith(required_target_prefix): raise InvalidArgumentValueError( "The target condition for a module configuration must start with '{}'".format( required_target_prefix ) ) if metrics: metrics = process_json_arg(metrics, argument_name="metrics") if "metrics" in metrics: metrics = metrics["metrics"] if metrics_key not in metrics: raise InvalidArgumentValueError( "metrics json must include the '{}' property".format(metrics_key) ) metrics = metrics[metrics_key] elif custom_metric_queries: metrics = assemble_nargs_to_dict(custom_metric_queries) if labels: labels = process_json_arg(labels, argument_name="labels") elif custom_labels: labels = assemble_nargs_to_dict(custom_labels) config_content = ConfigurationContent(**processed_content) config_metrics = ConfigurationMetrics(queries=metrics) config = Configuration( id=config_id, schema_version="2.0", labels=labels, content=config_content, metrics=config_metrics, target_condition=target_condition, etag="*", priority=priority, ) try: return service_sdk.configuration.create_or_update( id=config_id, configuration=config ) except CloudError as e: handle_service_exception(e) def _process_config_content(content, config_type): from knack.util import to_snake_case # Supports scenario where configuration payload is contained in 'content' key if "content" in content: content = content["content"] # Create new config dict to remove superflous properties processed_content = {} if config_type == ConfigType.adm: valid_adm_keys = ["deviceContent", "moduleContent"] if not all(key in content for key in valid_adm_keys): for key in valid_adm_keys: if key in content: processed_content[to_snake_case(key)] = content[key] return processed_content raise InvalidArgumentValueError( "Automatic device configuration payloads require property: {}".format( " or ".join(map(str, valid_adm_keys)) ) ) if config_type == ConfigType.edge or config_type == ConfigType.layered: valid_edge_key = "modulesContent" legacy_edge_key = "moduleContent" if valid_edge_key in content: processed_content[valid_edge_key] = content[valid_edge_key] elif legacy_edge_key in content: logger.warning( "'%s' is deprecated for edge deployments. Use '%s' instead - request is still processing...", legacy_edge_key, valid_edge_key, ) processed_content[valid_edge_key] = content[legacy_edge_key] if processed_content: # Schema based validation currently for IoT edge deployment only if config_type == ConfigType.edge: _validate_payload_schema(processed_content) processed_content[to_snake_case(valid_edge_key)] = processed_content[ valid_edge_key ] del processed_content[valid_edge_key] return processed_content raise InvalidArgumentValueError( "Edge deployment payloads require property: {}".format(valid_edge_key) ) def _validate_payload_schema(content): import json from os.path import join from azext_iot.models.validators import JsonSchemaType, JsonSchemaValidator from azext_iot.constants import EDGE_DEPLOYMENT_ROOT_SCHEMAS_PATH as root_schema_path from azext_iot.common.utility import shell_safe_json_parse EDGE_AGENT_SCHEMA_PATH = "azure-iot-edgeagent-deployment-{}.json" EDGE_HUB_SCHEMA_PATH = "azure-iot-edgehub-deployment-{}.json" EDGE_SCHEMA_PATH_DICT = { "$edgeAgent": EDGE_AGENT_SCHEMA_PATH, "$edgeHub": EDGE_HUB_SCHEMA_PATH, } modules_content = content["modulesContent"] system_modules_for_validation = ["$edgeAgent", "$edgeHub"] for sys_module in system_modules_for_validation: if sys_module in modules_content: if ( "properties.desired" in modules_content[sys_module] and "schemaVersion" in modules_content[sys_module]["properties.desired"] ): target_schema_ver = modules_content[sys_module][ "properties.desired" ]["schemaVersion"] target_schema_def_path = join(root_schema_path, f"{EDGE_SCHEMA_PATH_DICT[sys_module].format(target_schema_ver)}") logger.info("Attempting to fetch schema content from %s...", target_schema_def_path) if not exists(target_schema_def_path): logger.info("Invalid schema path %s, skipping validation...", target_schema_def_path) continue try: target_schema_def = str(read_file_content(target_schema_def_path)) target_schema_def = shell_safe_json_parse(target_schema_def) except Exception: logger.info( "Unable to fetch schema content from %s skipping validation...", target_schema_def_path, ) continue logger.info(f"Validating {sys_module} of deployment payload against schema...") to_validate_content = { sys_module: modules_content[sys_module] } draft_version = JsonSchemaType.draft4 if "$schema" in target_schema_def and "/draft-07/" in target_schema_def["$schema"]: draft_version = JsonSchemaType.draft7 v = JsonSchemaValidator(target_schema_def, draft_version) errors = v.validate(to_validate_content) if errors: # Pretty printing schema validation errors raise ValidationError( json.dumps( {"validationErrors": errors}, separators=(",", ":"), indent=2, ) ) def iot_hub_configuration_update( cmd, config_id, parameters, hub_name_or_hostname=None, resource_group_name=None, login=None, etag=None, auth_type_dataplane=None, ): from azext_iot.sdk.iothub.service.models import Configuration from azext_iot.common.utility import verify_transform discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) resolver = SdkResolver(target=target) service_sdk = resolver.get_sdk(SdkType.service_sdk) try: headers = {} headers["If-Match"] = '"{}"'.format(etag if etag else "*") verify = {"metrics": dict, "metrics.queries": dict, "content": dict} if parameters.get("labels"): verify["labels"] = dict verify_transform(parameters, verify) config = Configuration( id=parameters["id"], schema_version=parameters["schemaVersion"], labels=parameters["labels"], content=parameters["content"], metrics=parameters.get("metrics", None), target_condition=parameters["targetCondition"], priority=parameters["priority"], ) return service_sdk.configuration.create_or_update( id=config_id, configuration=config, custom_headers=headers ) except CloudError as e: handle_service_exception(e) except (AttributeError, TypeError) as err: raise CLIInternalError(err) def iot_hub_configuration_show( cmd, config_id, hub_name_or_hostname=None, resource_group_name=None, login=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) return _iot_hub_configuration_show(target=target, config_id=config_id) def _iot_hub_configuration_show(target, config_id): resolver = SdkResolver(target=target) service_sdk = resolver.get_sdk(SdkType.service_sdk) try: return service_sdk.configuration.get(id=config_id, raw=True).response.json() except CloudError as e: handle_service_exception(e) def iot_hub_configuration_list( cmd, hub_name_or_hostname=None, top=None, resource_group_name=None, login=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) result = _iot_hub_configuration_list(target) filtered = [ c for c in result if ( c["content"].get("deviceContent") is not None or c["content"].get("moduleContent") is not None ) ] return filtered[:top] # list[:None] == list[:len(list)] def iot_edge_deployment_list( cmd, hub_name_or_hostname=None, top=None, resource_group_name=None, login=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) result = _iot_hub_configuration_list(target) filtered = [c for c in result if c["content"].get("modulesContent") is not None] return filtered[:top] # list[:None] == list[:len(list)] def _iot_hub_configuration_list(target): resolver = SdkResolver(target=target) service_sdk = resolver.get_sdk(SdkType.service_sdk) try: result = service_sdk.configuration.get_configurations(raw=True).response.json() if not result: hub_name_or_hostname = target["name"] logger.info('No configurations found on hub "%s".', hub_name_or_hostname) return result except CloudError as e: handle_service_exception(e) def iot_hub_configuration_delete( cmd, config_id, hub_name_or_hostname=None, resource_group_name=None, login=None, etag=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) return _iot_hub_configuration_delete(target, config_id, etag) def _iot_hub_configuration_delete(target, config_id, etag=None): resolver = SdkResolver(target=target) service_sdk = resolver.get_sdk(SdkType.service_sdk) try: headers = {} headers["If-Match"] = '"{}"'.format(etag if etag else "*") service_sdk.configuration.delete(id=config_id, custom_headers=headers) except CloudError as e: handle_service_exception(e) def iot_edge_deployment_metric_show( cmd, config_id, metric_id, metric_type="user", hub_name_or_hostname=None, resource_group_name=None, login=None, auth_type_dataplane=None, ): return iot_hub_configuration_metric_show( cmd, config_id=config_id, metric_id=metric_id, metric_type=metric_type, hub_name_or_hostname=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type_dataplane=auth_type_dataplane, ) def iot_hub_configuration_metric_show( cmd, config_id, metric_id, metric_type="user", hub_name_or_hostname=None, resource_group_name=None, login=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) resolver = SdkResolver(target=target) service_sdk = resolver.get_sdk(SdkType.service_sdk) try: config = _iot_hub_configuration_show(target=target, config_id=config_id) metric_collection = None if metric_type == "system": metric_collection = config["systemMetrics"].get("queries") else: metric_collection = config["metrics"].get("queries") if metric_id not in metric_collection: raise InvalidArgumentValueError( "The {} metric '{}' is not defined in the configuration '{}'".format( metric_type, metric_id, config_id ) ) metric_query = metric_collection[metric_id] query_args = [metric_query] query_method = service_sdk.query.get_twins metric_result = _execute_query(query_args, query_method, None) output = {} output["metric"] = metric_id output["query"] = metric_query output["result"] = metric_result return output except CloudError as e: handle_service_exception(e) # Device Twin def iot_device_twin_show( cmd, device_id, hub_name_or_hostname=None, resource_group_name=None, login=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) return _iot_device_twin_show(target=target, device_id=device_id) def _iot_device_twin_show(target, device_id): resolver = SdkResolver(target=target) service_sdk = resolver.get_sdk(SdkType.service_sdk) try: return service_sdk.devices.get_twin(id=device_id, raw=True).response.json() except CloudError as e: handle_service_exception(e) def iot_device_twin_list( cmd, hub_name_or_hostname=None, top=1000, edge_enabled=False, resource_group_name=None, login=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) return _iot_device_twin_list(target, edge_enabled, top) def _iot_device_twin_list(target, edge_enabled=False, top=1000): query = ( "select * from devices where capabilities.iotEdge = true" if edge_enabled else "select * from devices" ) result = _iot_query(target=target, query_command=query, top=top) if not result: hub_name_or_hostname = target["name"] logger.info('No registered devices found on hub "%s".', hub_name_or_hostname) return result def iot_twin_update_custom(instance, desired=None, tags=None): payload = {} is_patch = False if desired: is_patch = True payload["properties"] = {"desired": process_json_arg(desired, "desired")} if tags: is_patch = True payload["tags"] = process_json_arg(tags, "tags") return payload if is_patch else instance def iot_device_twin_update( cmd, device_id, parameters, hub_name_or_hostname=None, resource_group_name=None, login=None, etag=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) return _iot_device_twin_update(target, device_id, parameters, etag) def _iot_device_twin_update( target, device_id, parameters, etag=None, ): from azext_iot.common.utility import verify_transform resolver = SdkResolver(target=target) service_sdk = resolver.get_sdk(SdkType.service_sdk) try: headers = {} headers["If-Match"] = '"{}"'.format(etag if etag else "*") verify = {} if parameters.get("properties"): if parameters["properties"].get("desired"): verify = {"properties.desired": dict} if parameters.get("tags"): verify["tags"] = dict verify_transform(parameters, verify) return service_sdk.devices.update_twin( id=device_id, device_twin_info=parameters, custom_headers=headers ) except CloudError as e: handle_service_exception(e) except (AttributeError, TypeError) as err: raise CLIInternalError(err) def iot_device_twin_replace( cmd, device_id, target_json, hub_name_or_hostname=None, resource_group_name=None, login=None, etag=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) return _iot_device_twin_replace(target, device_id, target_json, etag) def _iot_device_twin_replace(target, device_id, target_json, etag=None): resolver = SdkResolver(target=target) service_sdk = resolver.get_sdk(SdkType.service_sdk) try: target_json = process_json_arg(target_json, argument_name="json") headers = {} headers["If-Match"] = '"{}"'.format(etag if etag else "*") return service_sdk.devices.replace_twin( id=device_id, device_twin_info=target_json, custom_headers=headers ) except CloudError as e: handle_service_exception(e) def iot_device_method( cmd, device_id, method_name, hub_name_or_hostname=None, method_payload="{}", timeout=30, resource_group_name=None, login=None, auth_type_dataplane=None, ): from azext_iot.constants import ( METHOD_INVOKE_MAX_TIMEOUT_SEC, METHOD_INVOKE_MIN_TIMEOUT_SEC, ) if timeout > METHOD_INVOKE_MAX_TIMEOUT_SEC: raise InvalidArgumentValueError( "timeout must not be over {} seconds".format(METHOD_INVOKE_MAX_TIMEOUT_SEC) ) if timeout < METHOD_INVOKE_MIN_TIMEOUT_SEC: raise InvalidArgumentValueError( "timeout must be at least {} seconds".format(METHOD_INVOKE_MIN_TIMEOUT_SEC) ) discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) resolver = SdkResolver(target=target) service_sdk = resolver.get_sdk(SdkType.service_sdk) # Prevent msrest locking up shell service_sdk.config.retry_policy.retries = 1 try: if method_payload: method_payload = process_json_arg( method_payload, argument_name="method-payload" ) request_body = { "methodName": method_name, "payload": method_payload, "responseTimeoutInSeconds": timeout, "connectTimeoutInSeconds": timeout, } return service_sdk.devices.invoke_method( device_id=device_id, direct_method_request=request_body, timeout=timeout, ) except CloudError as e: handle_service_exception(e) # Device Module Method Invoke def iot_device_module_method( cmd, device_id, module_id, method_name, hub_name_or_hostname=None, method_payload="{}", timeout=30, resource_group_name=None, login=None, auth_type_dataplane=None, ): from azext_iot.constants import ( METHOD_INVOKE_MAX_TIMEOUT_SEC, METHOD_INVOKE_MIN_TIMEOUT_SEC, ) if timeout > METHOD_INVOKE_MAX_TIMEOUT_SEC: raise InvalidArgumentValueError( "timeout must not be over {} seconds".format(METHOD_INVOKE_MAX_TIMEOUT_SEC) ) if timeout < METHOD_INVOKE_MIN_TIMEOUT_SEC: raise InvalidArgumentValueError( "timeout must not be over {} seconds".format(METHOD_INVOKE_MIN_TIMEOUT_SEC) ) discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) resolver = SdkResolver(target=target) service_sdk = resolver.get_sdk(SdkType.service_sdk) # Prevent msrest locking up shell service_sdk.config.retry_policy.retries = 1 try: if method_payload: method_payload = process_json_arg( method_payload, argument_name="method-payload" ) request_body = { "methodName": method_name, "payload": method_payload, "responseTimeoutInSeconds": timeout, "connectTimeoutInSeconds": timeout, } return service_sdk.modules.invoke_method( device_id=device_id, module_id=module_id, direct_method_request=request_body, timeout=timeout, ) except CloudError as e: handle_service_exception(e) # Utility def iot_get_sas_token( cmd, hub_name_or_hostname=None, device_id=None, policy_name="iothubowner", key_type="primary", duration=3600, resource_group_name=None, login=None, module_id=None, auth_type_dataplane=None, connection_string=None, ): key_type = key_type.lower() policy_name = policy_name.lower() if login and policy_name != "iothubowner": raise ArgumentUsageError( "You are unable to change the sas policy with a hub connection string login." ) if login and key_type != "primary" and not device_id: raise ArgumentUsageError( "For non-device sas, you are unable to change the key type with a connection string login." ) if module_id and not device_id: raise ArgumentUsageError( "You are unable to get sas token for module without device information." ) if connection_string: return { DeviceAuthApiType.sas.value: _iot_build_sas_token_from_cs( connection_string, duration, ).generate_sas_token() } return { DeviceAuthApiType.sas.value: _iot_build_sas_token( cmd, hub_name_or_hostname, device_id, module_id, policy_name, key_type, duration, resource_group_name, login, auth_type_dataplane, ).generate_sas_token() } def _iot_build_sas_token_from_cs(connection_string, duration=3600): uri = None policy = None key = None parsed_cs = None all_parsers = [ ConnectionStringParser.Module, ConnectionStringParser.Device, ConnectionStringParser.IotHub, ] for parser in all_parsers: try: parsed_cs = parser(connection_string) if "SharedAccessKeyName" in parsed_cs: policy = parsed_cs["SharedAccessKeyName"] key = parsed_cs["SharedAccessKey"] if parser == ConnectionStringParser.IotHub: uri = parsed_cs["HostName"] elif parser == ConnectionStringParser.Module: uri = "{}/devices/{}/modules/{}".format( parsed_cs["HostName"], parsed_cs["DeviceId"], parsed_cs["ModuleId"] ) elif parser == ConnectionStringParser.Device: uri = "{}/devices/{}".format(parsed_cs["HostName"], parsed_cs["DeviceId"]) else: raise InvalidArgumentValueError("Given Connection String was not in a supported format.") return SasTokenAuthentication(uri, policy, key, duration) except ValueError: continue raise InvalidArgumentValueError("Given Connection String was not in a supported format.") def _iot_build_sas_token( cmd, hub_name_or_hostname=None, device_id=None, module_id=None, policy_name="iothubowner", key_type="primary", duration=3600, resource_group_name=None, login=None, auth_type_dataplane=None, ): from azext_iot.common._azure import ( parse_iot_device_connection_string, parse_iot_device_module_connection_string, ) # There is no dataplane operation for a pure IoT Hub sas token if all([device_id is None, module_id is None]): auth_type_dataplane = "key" discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, policy_name=policy_name, login=login, auth_type=auth_type_dataplane, ) uri = None policy = None key = None if device_id: logger.info( 'Obtaining device "%s" details from registry, using IoT Hub policy "%s"', device_id, policy_name, ) device = _iot_device_show(target, device_id) if module_id: module = _iot_device_module_show(target, device_id, module_id) module_cs = _build_device_or_module_connection_string( entity=module, key_type=key_type ) uri = "{}/devices/{}/modules/{}".format( target["entity"], device_id, module_id ) try: parsed_module_cs = parse_iot_device_module_connection_string(module_cs) except ValueError as e: logger.debug(e) raise CLIInternalError("This module does not support SAS auth.") key = parsed_module_cs["SharedAccessKey"] else: device_cs = _build_device_or_module_connection_string( entity=device, key_type=key_type ) uri = "{}/devices/{}".format(target["entity"], device_id) try: parsed_device_cs = parse_iot_device_connection_string(device_cs) except ValueError as e: logger.debug(e) raise CLIInternalError("This device does not support SAS auth.") key = parsed_device_cs["SharedAccessKey"] else: uri = target["entity"] policy = target["policy"] key = target["primarykey"] if key_type == "primary" else target["secondarykey"] return SasTokenAuthentication(uri, policy, key, duration) def _build_device_or_module_connection_string(entity, key_type="primary"): is_device = entity.get("moduleId") is None template = ( "HostName={};DeviceId={};{}" if is_device else "HostName={};DeviceId={};ModuleId={};{}" ) auth = entity["authentication"] auth_type = auth["type"].lower() if auth_type == DeviceAuthApiType.sas.value.lower(): key = "SharedAccessKey={}".format( auth["symmetricKey"]["primaryKey"] if key_type == "primary" else auth["symmetricKey"]["secondaryKey"] ) elif auth_type in [ DeviceAuthApiType.certificateAuthority.value.lower(), DeviceAuthApiType.selfSigned.value.lower(), ]: key = "x509=true" else: raise CLIInternalError("Unable to form target connection string") if is_device: return template.format(entity.get("hub"), entity.get("deviceId"), key) else: return template.format( entity.get("hub"), entity.get("deviceId"), entity.get("moduleId"), key ) def iot_get_device_connection_string( cmd, device_id, hub_name_or_hostname=None, key_type="primary", resource_group_name=None, login=None, auth_type_dataplane=None, ): result = {} device = iot_device_show( cmd, device_id, hub_name_or_hostname=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type_dataplane=auth_type_dataplane, ) result["connectionString"] = _build_device_or_module_connection_string( device, key_type ) return result def iot_get_module_connection_string( cmd, device_id, module_id, hub_name_or_hostname=None, key_type="primary", resource_group_name=None, login=None, auth_type_dataplane=None, ): result = {} module = iot_device_module_show( cmd, device_id, module_id, resource_group_name=resource_group_name, hub_name_or_hostname=hub_name_or_hostname, login=login, auth_type_dataplane=auth_type_dataplane, ) result["connectionString"] = _build_device_or_module_connection_string( module, key_type ) return result def _get_service_sdk( cmd, hub_name_or_hostname: str, resource_group_name: str = None, login=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) resolver = SdkResolver(target=target) return resolver.get_sdk(SdkType.service_sdk) def _generate_blob_container_uri( cmd, storage_account_name: str, blob_container_name: str, identity: str = None, ): from azext_iot.common.embedded_cli import EmbeddedCLI if blob_container_name is None or storage_account_name is None: raise ClientRequestError( "Storage account and Blob container names are required to generate blob container uri" ) cli = EmbeddedCLI(cli_ctx=cmd.cli_ctx) storage_endpoint = cli.invoke( "storage account show -n '{}'".format( storage_account_name ) ).as_json()["primaryEndpoints"]["blob"] container_sas_url = storage_endpoint + blob_container_name if not identity: storage_cstring = cli.invoke( "storage account show-connection-string -n '{}'".format( storage_account_name ) ).as_json()["connectionString"] sas_token = generate_storage_account_sas_token(storage_cstring, read=True, write=True, create=True, add=True, delete=True) container_sas_url = container_sas_url + "?" + sas_token return container_sas_url def _create_export_import_job_properties( job_type: str, input_blob_container_uri: str = None, output_blob_container_uri: str = None, include_keys: bool = False, identity: str = None, ): from azext_iot.common.shared import AuthenticationType from azext_iot.sdk.iothub.service.models import ( JobProperties, ManagedIdentity ) job_properties = JobProperties() if job_type == JobType.exportDevices.value: job_properties.exclude_keys_in_export = not include_keys elif job_type == JobType.importDevices.value: if exists(input_blob_container_uri): input_blob_container_uri = read_file_content(input_blob_container_uri) job_properties.input_blob_container_uri = input_blob_container_uri else: raise ClientRequestError( "Invalid job type: {}".format(job_type) ) job_properties.type = job_type if exists(output_blob_container_uri): output_blob_container_uri = read_file_content(output_blob_container_uri) job_properties.output_blob_container_uri = output_blob_container_uri if identity is None: job_properties.storage_authentication_type = AuthenticationType.keyBased.name else: job_properties.storage_authentication_type = AuthenticationType.identityBased.name if identity != "[system]": job_properties.identity = ManagedIdentity(user_assigned_identity=identity) return job_properties def iot_device_export( cmd, hub_name_or_hostname: str = None, blob_container_uri: str = None, blob_container_name: str = None, storage_account_name: str = None, include_keys: bool = False, storage_authentication_type: str = None, identity: str = None, resource_group_name: str = None, login=None, auth_type_dataplane=None, ): if blob_container_uri is None: blob_container_uri = _generate_blob_container_uri( cmd, storage_account_name, blob_container_name, identity ) if storage_authentication_type is not None: logger.warning( "The parameter --sat/--storage-authentication-type has been deprecated and should not be provided. " ) logger.warning( "The parameter --auth-type is now used to specify IoT Hub data access auth type instead of storage access auth type. " ) service_sdk = _get_service_sdk( cmd, hub_name_or_hostname, resource_group_name, login, auth_type_dataplane ) export_job_properties = _create_export_import_job_properties( job_type=JobType.exportDevices.value, output_blob_container_uri=blob_container_uri, include_keys=include_keys, identity=identity ) try: return service_sdk.jobs.create_import_export_job(export_job_properties) except CloudError as e: handle_service_exception(e) def iot_device_import( cmd, hub_name_or_hostname: str = None, input_blob_container_uri: str = None, input_blob_container_name: str = None, input_storage_account_name: str = None, output_blob_container_uri: str = None, output_blob_container_name: str = None, output_storage_account_name: str = None, storage_authentication_type: str = None, resource_group_name: str = None, identity: str = None, login=None, auth_type_dataplane=None, ): if input_blob_container_uri is None: input_blob_container_uri = _generate_blob_container_uri( cmd, input_storage_account_name, input_blob_container_name, identity ) if output_blob_container_uri is None: output_blob_container_uri = _generate_blob_container_uri( cmd, output_storage_account_name, output_blob_container_name, identity ) if storage_authentication_type is not None: logger.warning( "The parameter --sat/--storage-authentication-type has been deprecated and should not be provided. " ) logger.warning( "The parameter --auth-type is now used to specify IoT Hub data access auth type instead of storage access auth type. " ) service_sdk = _get_service_sdk( cmd, hub_name_or_hostname, resource_group_name, login, auth_type_dataplane ) import_job_properties = _create_export_import_job_properties( job_type=JobType.importDevices.value, input_blob_container_uri=input_blob_container_uri, output_blob_container_uri=output_blob_container_uri, identity=identity ) try: return service_sdk.jobs.create_import_export_job(import_job_properties) except CloudError as e: handle_service_exception(e) def iot_hub_monitor_events( cmd, hub_name_or_hostname=None, device_id=None, interface=None, module_id=None, consumer_group="$Default", timeout=300, enqueued_time=None, resource_group_name=None, yes=False, properties=None, repair=False, login=None, content_type=None, device_query=None, message_count: Optional[int] = None, ): try: _iot_hub_monitor_events( cmd, hub_name_or_hostname=hub_name_or_hostname, device_id=device_id, interface_name=interface, module_id=module_id, consumer_group=consumer_group, timeout=timeout, enqueued_time=enqueued_time, resource_group_name=resource_group_name, yes=yes, properties=properties, repair=repair, login=login, content_type=content_type, device_query=device_query, message_count=message_count, ) except RuntimeError as e: raise CLIInternalError(e) def iot_hub_monitor_feedback( cmd, hub_name_or_hostname=None, device_id=None, yes=False, wait_on_id=None, repair=False, resource_group_name=None, login=None, auth_type_dataplane=None, ): from azext_iot.common.deps import ensure_uamqp config = cmd.cli_ctx.config ensure_uamqp(config, yes, repair) discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, login=login, auth_type=auth_type_dataplane, ) return _iot_hub_monitor_feedback( target=target, device_id=device_id, wait_on_id=wait_on_id ) def iot_hub_distributed_tracing_show( cmd, hub_name_or_hostname, device_id, resource_group_name=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, auth_type=auth_type_dataplane ) device_twin = _iot_hub_distributed_tracing_show(discovery=discovery, target=target, device_id=device_id) return _customize_device_tracing_output( device_twin["deviceId"], device_twin["properties"]["desired"], device_twin["properties"]["reported"], ) def _iot_hub_monitor_events( cmd, interface_name=None, module_id=None, hub_name_or_hostname=None, device_id=None, consumer_group="$Default", timeout=300, enqueued_time=None, resource_group_name=None, yes=False, properties=None, repair=False, login=None, content_type=None, device_query=None, message_count: Optional[int] = None, ): (enqueued_time, properties, timeout, output, message_count) = init_monitoring( cmd, timeout, properties, enqueued_time, repair, yes, message_count ) device_ids = {} if device_query: devices_result = iot_query( cmd, device_query, hub_name_or_hostname, None, resource_group_name, login=login ) if devices_result: for device_result in devices_result: device_ids[device_result["deviceId"]] = True discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, include_events=True, login=login, ) from azext_iot.monitor.builders import hub_target_builder from azext_iot.monitor.handlers import CommonHandler from azext_iot.monitor.telemetry import start_single_monitor from azext_iot.monitor.utility import generate_on_start_string from azext_iot.monitor.models.arguments import ( CommonParserArguments, CommonHandlerArguments, ) target = hub_target_builder.EventTargetBuilder().build_iot_hub_target(target) target.add_consumer_group(consumer_group) on_start_string = generate_on_start_string(device_id=device_id) parser_args = CommonParserArguments( properties=properties, content_type=content_type ) handler_args = CommonHandlerArguments( output=output, common_parser_args=parser_args, devices=device_ids, device_id=device_id, interface_name=interface_name, module_id=module_id, max_messages=message_count, ) handler = CommonHandler(handler_args) start_single_monitor( target=target, enqueued_time_utc=enqueued_time, on_start_string=on_start_string, on_message_received=handler.parse_message, timeout=timeout, ) def iot_hub_distributed_tracing_update( cmd, hub_name_or_hostname, device_id, sampling_mode, sampling_rate, resource_group_name=None, auth_type_dataplane=None, ): discovery = IotHubDiscovery(cmd) target = discovery.get_target( resource_name=hub_name_or_hostname, resource_group_name=resource_group_name, include_events=True, auth_type=auth_type_dataplane, ) if int(sampling_rate) not in range(0, 101): raise InvalidArgumentValueError( "Sampling rate is a percentage, So only values from 0 to 100(inclusive) are permitted." ) device_twin = _iot_hub_distributed_tracing_show(discovery=discovery, target=target, device_id=device_id) if TRACING_PROPERTY not in device_twin["properties"]["desired"]: device_twin["properties"]["desired"][TRACING_PROPERTY] = {} device_twin["properties"]["desired"][TRACING_PROPERTY]["sampling_rate"] = int( sampling_rate ) device_twin["properties"]["desired"][TRACING_PROPERTY]["sampling_mode"] = ( 1 if sampling_mode.lower() == "on" else 2 ) result = iot_device_twin_update( cmd, device_id, device_twin, hub_name_or_hostname, resource_group_name ) return _customize_device_tracing_output( result.device_id, result.properties.desired, result.properties.reported ) def iot_hub_connection_string_show( cmd, hub_name_or_hostname=None, resource_group_name=None, policy_name="iothubowner", key_type=KeyType.primary.value, show_all=False, default_eventhub=False, ): discovery = IotHubDiscovery(cmd) if hub_name_or_hostname is None: hubs = discovery.get_resources(resource_group_name) if hubs is None: raise ResourceNotFoundError("No IoT Hub found.") def conn_str_getter(hub): return _get_hub_connection_string( discovery, hub, policy_name, key_type, show_all, default_eventhub ) connection_strings = [] for hub in hubs: if hub.properties.state == IoTHubStateType.Active.value: try: connection_strings.append( { "name": hub.name, "connectionString": conn_str_getter(hub) if show_all else conn_str_getter(hub)[0], } ) except Exception: logger.warning( f"Warning: The IoT Hub {hub.name} in resource group " + f"{hub.additional_properties['resourcegroup']} does " + f"not have the target policy {policy_name}." ) else: logger.warning( f"Warning: The IoT Hub {hub.name} in resource group " + f"{hub.additional_properties['resourcegroup']} is skipped " + "because the hub is not active." ) return connection_strings hub = discovery.find_resource(hub_name_or_hostname, resource_group_name) if hub: conn_str = _get_hub_connection_string( discovery, hub, policy_name, key_type, show_all, default_eventhub ) return {"connectionString": conn_str if show_all else conn_str[0]} def _get_hub_connection_string( discovery, hub, policy_name, key_type, show_all, default_eventhub ): policies = [] if show_all: policies.extend( discovery.get_policies(hub.name, hub.additional_properties["resourcegroup"]) ) else: policies.append( discovery.find_policy( hub.name, hub.additional_properties["resourcegroup"], policy_name ) ) if default_eventhub: cs_template_eventhub = ( "Endpoint={};SharedAccessKeyName={};SharedAccessKey={};EntityPath={}" ) endpoint = hub.properties.event_hub_endpoints["events"].endpoint entityPath = hub.properties.event_hub_endpoints["events"].path return [ cs_template_eventhub.format( endpoint, p.key_name, p.secondary_key if key_type == KeyType.secondary.value else p.primary_key, entityPath, ) for p in policies if "serviceconnect" in ( p.rights.value.lower() if isinstance(p.rights, (Enum, EnumMeta)) else p.rights.lower() ) ] hostname = hub.properties.host_name cs_template = "HostName={};SharedAccessKeyName={};SharedAccessKey={}" return [ cs_template.format( hostname, p.key_name, p.secondary_key if key_type == KeyType.secondary.value else p.primary_key, ) for p in policies ] def _iot_hub_monitor_feedback(target, device_id, wait_on_id): from azext_iot.monitor import event event.monitor_feedback( target=target, device_id=device_id, wait_on_id=wait_on_id, token_duration=3600 ) def _iot_hub_distributed_tracing_show(discovery, target, device_id): device_twin = _iot_device_twin_show(target=target, device_id=device_id) _validate_device_tracing(discovery, target, device_twin) return device_twin def _validate_device_tracing(discovery, target, device_twin): if not all([target.get("location"), target.get("sku_tier")]): resource = discovery.find_resource(target["name"]) target["location"] = resource.location target["sku_tier"] = resource.sku.tier.value if isinstance(resource.sku.tier, (Enum, EnumMeta)) else resource.sku.tier if target["location"].lower() not in TRACING_ALLOWED_FOR_LOCATION: raise ClientRequestError( 'Distributed tracing isn\'t supported for the hub located at "{}" location.'.format( target["location"] ) ) if target["sku_tier"].lower() != TRACING_ALLOWED_FOR_SKU: raise ClientRequestError( 'Distributed tracing isn\'t supported for the hub belongs to "{}" sku tier.'.format( target["sku_tier"] ) ) if device_twin["capabilities"]["iotEdge"]: raise ClientRequestError( 'The device "{}" should be a non-edge device.'.format(device_twin["deviceId"]) ) def _customize_device_tracing_output(device_id, desired, reported): output = {} desired_tracing = desired.get(TRACING_PROPERTY, None) if desired_tracing: output["deviceId"] = device_id output["samplingMode"] = ( "enabled" if desired_tracing.get("sampling_mode") == 1 else "disabled" ) output["samplingRate"] = "{}%".format(desired_tracing.get("sampling_rate")) output["isSynced"] = False reported_tracing = reported.get(TRACING_PROPERTY, None) if ( reported_tracing and desired_tracing.get("sampling_mode") == reported_tracing.get("sampling_mode").get("value", None) and desired_tracing.get("sampling_rate") == reported_tracing.get("sampling_rate").get("value", None) ): output["isSynced"] = True return output