azext_iot/iothub/providers/device_messaging.py (475 lines of code) (raw):

# coding=utf-8 # -------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- from os.path import exists, basename from time import time, sleep from typing import Dict, Optional from azext_iot.iothub.common import NON_DECODABLE_PAYLOAD from knack.log import get_logger from azext_iot.common.shared import DeviceAuthApiType, KeyType, ProtocolType, SdkType, SettleType from azext_iot.common.utility import ( handle_service_exception, process_json_arg, read_file_content, validate_key_value_pairs ) from azure.cli.core.azclierror import ( ArgumentUsageError, CLIInternalError, FileOperationError, InvalidArgumentValueError, MutuallyExclusiveArgumentError, RequiredArgumentMissingError, ) from azext_iot._factory import SdkResolver, CloudError from azext_iot.iothub.providers.base import IoTHubProvider from azext_iot.operations.hub import ( _build_device_or_module_connection_string, _iot_device_show, _iot_hub_monitor_feedback ) import pprint logger = get_logger(__name__) printer = pprint.PrettyPrinter(indent=2) class DeviceMessagingProvider(IoTHubProvider): def __init__( self, cmd, device_id: str, hub_name: Optional[str] = None, rg: Optional[str] = None, login: Optional[str] = None, auth_type_dataplane: Optional[str] = None ): super(DeviceMessagingProvider, self).__init__( cmd=cmd, hub_name=hub_name, rg=rg, login=login, auth_type_dataplane=auth_type_dataplane ) self.device_id = device_id # prob move this into base - there is one command that needs service + other providers use service sdk self.device_resolver = SdkResolver(target=self.target, device_id=device_id) self.device_sdk = self.device_resolver.get_sdk(SdkType.device_sdk) def device_send_message( self, data: str = "Ping from Az CLI IoT Extension", data_file_path: Optional[str] = None, properties: Optional[str] = None, msg_count: int = 1, device_symmetric_key: Optional[str] = None, certificate_file: Optional[str] = None, key_file: Optional[str] = None, passphrase: Optional[str] = None, model_id: Optional[str] = None, ): from azext_iot.iothub.providers.mqtt import MQTTProvider device = self._d2c_get_device_auth_props( symmetric_key=device_symmetric_key, certificate_file=certificate_file, key_file=key_file, passphrase=passphrase ) if properties: properties = validate_key_value_pairs(properties) device_connection_string = _build_device_or_module_connection_string(device, KeyType.primary.value) client_mqtt = MQTTProvider( hub_hostname=self.target["entity"], device_conn_string=device_connection_string, x509_files=device["authentication"].get("x509_files"), device_id=self.device_id, model_id=model_id ) for _ in range(msg_count): client_mqtt.send_d2c_message( message_content=data, message_file_path=data_file_path, properties=properties ) client_mqtt.shutdown() def device_send_message_http(self, data: str, headers: dict = None): try: return self.device_sdk.device.send_device_event( id=self.device_id, message=data, custom_headers=headers ) except CloudError as e: handle_service_exception(e) def c2d_message_complete(self, etag: str): try: return self.device_sdk.device.complete_device_bound_notification( id=self.device_id, etag=etag ) except CloudError as e: handle_service_exception(e) def c2d_message_reject(self, etag: str): try: return self.device_sdk.device.complete_device_bound_notification( id=self.device_id, etag=etag, reject="" ) except CloudError as e: handle_service_exception(e) def c2d_message_abandon(self, etag: str): try: return self.device_sdk.device.abandon_device_bound_notification( id=self.device_id, etag=etag ) except CloudError as e: handle_service_exception(e) def c2d_message_receive( self, lock_timeout: int = 60, abandon: bool = False, complete: bool = False, reject: bool = False, ): ack = None ack_vals = [abandon, complete, reject] if any(ack_vals): if len(list(filter(lambda val: val, ack_vals))) > 1: raise MutuallyExclusiveArgumentError( "Only one c2d-message ack argument can be used [--complete, --abandon, --reject]" ) if abandon: ack = SettleType.abandon.value elif complete: ack = SettleType.complete.value elif reject: ack = SettleType.reject.value return self._c2d_message_receive(lock_timeout, ack) def _c2d_message_receive(self, lock_timeout: int = 60, ack: Optional[str] = None): from azext_iot.constants import MESSAGING_HTTP_C2D_SYSTEM_PROPERTIES request_headers = {} if lock_timeout: request_headers["IotHub-MessageLockTimeout"] = str(lock_timeout) try: result = self.device_sdk.device.receive_device_bound_notification( id=self.device_id, custom_headers=request_headers, raw=True ).response if result and result.status_code == 200: payload = {"properties": {}} if "etag" in result.headers: eTag = result.headers["etag"].strip('"') payload["etag"] = eTag if ack: ack_response = {} if ack == SettleType.abandon.value: logger.debug("__Abandoning message__") ack_response = ( self.device_sdk.device.abandon_device_bound_notification( id=self.device_id, etag=eTag, raw=True ) ) elif ack == SettleType.reject.value: logger.debug("__Rejecting message__") ack_response = ( self.device_sdk.device.complete_device_bound_notification( id=self.device_id, etag=eTag, reject="", raw=True ) ) else: logger.debug("__Completing message__") ack_response = ( self.device_sdk.device.complete_device_bound_notification( id=self.device_id, etag=eTag, raw=True ) ) payload["ack"] = ( ack if (ack_response and ack_response.response.status_code == 204) else None ) app_prop_prefix = "iothub-app-" app_prop_keys = [ header for header in result.headers if header.lower().startswith(app_prop_prefix) ] app_props = {} for key in app_prop_keys: app_props[key[len(app_prop_prefix) :]] = result.headers[key] if app_props: payload["properties"]["app"] = app_props sys_props = {} for key in MESSAGING_HTTP_C2D_SYSTEM_PROPERTIES: if key in result.headers: sys_props[key] = result.headers[key] if sys_props: payload["properties"]["system"] = sys_props if result.content: target_encoding = result.headers.get("content-encoding", "utf-8") payload["data"] = NON_DECODABLE_PAYLOAD if target_encoding in ["utf-8", "utf8", "utf-16", "utf16", "utf-32", "utf32"]: logger.info(f"Decoding message data encoded with: {target_encoding}") try: payload["data"] = result.content.decode(target_encoding) except Exception: pass return payload return except CloudError as e: handle_service_exception(e) def c2d_message_send( self, data: str = "Ping from Az CLI IoT Extension", data_file_path: Optional[str] = None, message_id: Optional[str] = None, correlation_id: Optional[str] = None, user_id: Optional[str] = None, content_encoding: str = "utf-8", content_type: Optional[str] = None, expiry_time_utc: Optional[str] = None, properties: Optional[str] = None, ack: Optional[str] = None, wait_on_feedback: bool = False, ): if wait_on_feedback and not ack: raise RequiredArgumentMissingError( 'To wait on device feedback, ack must be "full", "negative" or "positive"' ) if properties: properties = validate_key_value_pairs(properties) if expiry_time_utc: now_in_milli = int(time() * 1000) user_msg_expiry = int(expiry_time_utc) if user_msg_expiry < now_in_milli: raise InvalidArgumentValueError("Message expiry time utc is in the past!") from azext_iot.monitor import event msg_id, errors = event.send_c2d_message( target=self.target, device_id=self.device_id, data=data, data_file_path=data_file_path, message_id=message_id, correlation_id=correlation_id, user_id=user_id, content_encoding=content_encoding, content_type=content_type, expiry_time_utc=expiry_time_utc, properties=properties, ack=ack, ) if errors: raise CLIInternalError( "C2D message error: {}, use --debug for more details.".format(errors) ) if wait_on_feedback: _iot_hub_monitor_feedback(target=self.target, device_id=self.device_id, wait_on_id=msg_id) def c2d_message_purge(self): service_sdk = self.get_sdk(SdkType.service_sdk) return service_sdk.cloud_to_device_messages.purge_cloud_to_device_message_queue( self.device_id ) def simulate_device( self, receive_settle: str = "complete", data: str = "Ping from Az CLI IoT Extension", msg_count: int = 100, msg_interval: int = 3, protocol_type: str = "mqtt", properties: Optional[str] = None, device_symmetric_key: Optional[str] = None, certificate_file: Optional[str] = None, key_file: Optional[str] = None, passphrase: Optional[str] = None, method_response_code: Optional[str] = None, method_response_payload: Optional[str] = None, init_reported_properties: Optional[str] = None, model_id: Optional[str] = None, ): import sys import uuid import datetime import json from azext_iot.iothub.providers.mqtt import MQTTProvider from threading import Event, Thread from tqdm import tqdm from azext_iot.constants import ( MIN_SIM_MSG_INTERVAL, MIN_SIM_MSG_COUNT, SIM_RECEIVE_SLEEP_SEC, ) protocol_type = protocol_type.lower() if protocol_type == ProtocolType.mqtt.name: if receive_settle != "complete": raise InvalidArgumentValueError('mqtt protocol only supports settle type of "complete"') if msg_interval < MIN_SIM_MSG_INTERVAL: raise InvalidArgumentValueError("msg interval must be at least {}".format(MIN_SIM_MSG_INTERVAL)) if msg_count < MIN_SIM_MSG_COUNT: raise InvalidArgumentValueError("msg count must be at least {}".format(MIN_SIM_MSG_COUNT)) if protocol_type != ProtocolType.mqtt.name: if method_response_code: raise ArgumentUsageError( "'method-response-code' not supported, {} doesn't allow direct methods.".format(protocol_type) ) if method_response_payload: raise ArgumentUsageError( "'method-response-payload' not supported, {} doesn't allow direct methods.".format(protocol_type) ) if init_reported_properties: raise ArgumentUsageError( "'init-reported-properties' not supported, {} doesn't allow setting twin props".format(protocol_type) ) if any([certificate_file, key_file, passphrase]): raise ArgumentUsageError( "'certificate-file', 'key-file', and 'passphrase' not supported, {} doesn't allow x509 " "certificate authentication".format(protocol_type) ) if model_id: raise ArgumentUsageError( f"`model-id` is not supported with {protocol_type} protocol." ) properties_to_send = _simulate_get_default_properties(protocol_type) user_properties = validate_key_value_pairs(properties) or {} properties_to_send.update(user_properties) if method_response_payload: method_response_payload = process_json_arg( method_response_payload, argument_name="method-response-payload" ) if init_reported_properties: init_reported_properties = process_json_arg( init_reported_properties, argument_name="init-reported-properties" ) class generator(object): def __init__(self): self.calls = 0 def generate(self, jsonify=True): self.calls += 1 payload = { "id": str(uuid.uuid4()), "timestamp": str(datetime.datetime.utcnow()), "data": str(data + " #{}".format(self.calls)), } return json.dumps(payload) if jsonify else payload cancellation_token = Event() def http_wrap(generator, msg_interval, msg_count): for _ in tqdm(range(0, msg_count), desc='Sending and receiving events via https', ascii=' #'): d = generator.generate(False) self.device_send_message_http(d, headers=properties_to_send) if cancellation_token.wait(msg_interval): break try: device = self._d2c_get_device_auth_props( symmetric_key=device_symmetric_key, certificate_file=certificate_file, key_file=key_file, passphrase=passphrase ) if protocol_type == ProtocolType.mqtt.name: device_connection_string = _build_device_or_module_connection_string(device, KeyType.primary.value) client_mqtt = MQTTProvider( hub_hostname=self.target["entity"], device_conn_string=device_connection_string, x509_files=device["authentication"].get("x509_files"), device_id=self.device_id, method_response_code=method_response_code, method_response_payload=method_response_payload, init_reported_properties=init_reported_properties, model_id=model_id ) client_mqtt.execute( data=generator(), properties=properties_to_send, publish_delay=msg_interval, msg_count=msg_count ) client_mqtt.shutdown() else: op = Thread( target=http_wrap, args=(generator(), msg_interval, msg_count) ) op.start() while op.is_alive(): self._handle_c2d_msg(receive_settle) sleep(SIM_RECEIVE_SLEEP_SEC) except KeyboardInterrupt: sys.exit() except Exception as x: raise CLIInternalError(x) finally: if cancellation_token: cancellation_token.set() def device_upload_file( self, file_path: str, content_type: str, ): from azext_iot.sdk.iothub.device.models import FileUploadCompletionStatus if not exists(file_path): raise FileOperationError('File path "{}" does not exist!'.format(file_path)) content = read_file_content(file_path) file_name = basename(file_path) try: upload_meta = self.device_sdk.device.create_file_upload_sas_uri( device_id=self.device_id, blob_name=file_name, raw=True ).response.json() storage_endpoint = "{}/{}/{}{}".format( upload_meta["hostName"], upload_meta["containerName"], upload_meta["blobName"], upload_meta["sasToken"], ) completion_status = FileUploadCompletionStatus( correlation_id=upload_meta["correlationId"], is_success=True ) upload_response = self.device_sdk.device.upload_file_to_container( storage_endpoint=storage_endpoint, content=content, content_type=content_type, ) completion_status.status_code = upload_response.status_code completion_status.status_reason = upload_response.reason return self.device_sdk.device.update_file_upload_status( device_id=self.device_id, file_upload_completion_status=completion_status ) except CloudError as e: handle_service_exception(e) def _d2c_get_device_auth_props( self, symmetric_key: Optional[str] = None, certificate_file: Optional[str] = None, key_file: Optional[str] = None, passphrase: Optional[str] = None ): if symmetric_key: return { "hub": self.target["entity"], "deviceId": self.device_id, "authentication": { "type": DeviceAuthApiType.sas.value, "symmetricKey": { "primaryKey": symmetric_key } } } elif (certificate_file and key_file): # custom device structure to hold needed info # Note that here signed vs ca doesnt matter. CA will need a verified cert in the service # Note that for the CA device the subject of the cert must be the device_id return { "deviceId": self.device_id, "authentication": { "type": DeviceAuthApiType.selfSigned.value, "x509_files": { "certificateFile": certificate_file, "keyFile": key_file, "passphrase": passphrase } } } elif any([certificate_file, key_file, passphrase]): raise RequiredArgumentMissingError( "Both 'certificate-file' and 'key-file' required for x509 certificate authentication." ) else: # Get the device info from the service side return _iot_device_show(self.target, self.device_id) def _handle_c2d_msg(self, receive_settle: str, lock_timeout: int = 60): result = self._c2d_message_receive(lock_timeout) if result: print() print("C2D Message Handler [Received C2D message]:") printer.pprint(result) if receive_settle == "reject": print("C2D Message Handler [Rejecting message]") self.c2d_message_reject(result["etag"]) elif receive_settle == "abandon": print("C2D Message Handler [Abandoning message]") self.c2d_message_abandon(result["etag"]) else: print("C2D Message Handler [Completing message]") self.c2d_message_complete(result["etag"]) return True return False def _simulate_get_default_properties(protocol: str) -> Dict[str, str]: default_properties = {} is_mqtt = protocol == ProtocolType.mqtt.name default_properties["$.ct" if is_mqtt else "content-type"] = "application/json" default_properties["$.ce" if is_mqtt else "content-encoding"] = "utf-8" return default_properties