docker_images/pythonv2/wrapper/python_glue/internal_iothub_glue_async.py (247 lines of code) (raw):

# Copyright (c) Microsoft. All rights reserved. # Licensed under the MIT license. See LICENSE file in the project root for # full license information. import logging import convert from connection_status import ConnectionStatus from azure.iot.device import MethodResponse from azure.iot.device.aio import IoTHubDeviceClient, IoTHubModuleClient from azure.iot.device.common import mqtt_transport from internal_iothub_glue import get_kwargs import asyncio import threading import queue logger = logging.getLogger(__name__) DEFAULT_KEEPALIVE = 8 class Connect(ConnectionStatus): async def connect(self, transport_type, connection_string, cert): assert False async def disconnect(self): assert False async def create_from_connection_string( self, transport_type, connection_string, cert ): kwargs = get_kwargs(transport_type) if "GatewayHostName" in connection_string: self.client = self.client_class.create_from_connection_string( connection_string, server_verification_cert=cert, **kwargs ) else: self.client = self.client_class.create_from_connection_string( connection_string, **kwargs ) mqtt_transport.DEFAULT_KEEPALIVE = DEFAULT_KEEPALIVE self._attach_connect_event_watcher() async def create_from_x509(self, transport_type, x509): # BKTODO pass async def connect2(self): await self.client.connect() async def reconnect(self, force_renew_password): # BKTODO pass async def disconnect2(self): # disconnect2 keeps the object around. We might use it again await self.client.disconnect() async def destroy(self): if self.client: try: if hasattr(self.client, "shutdown"): await self.client.shutdown() else: await self.client.disconnect() finally: self.client = None class DeviceConnect(object): async def create_from_symmetric_key( self, transport_type, device_id, hostname, symmetric_key ): kwargs = get_kwargs(transport_type) self.client = self.client_class.create_from_symmetric_key( symmetric_key, hostname, device_id, **kwargs ) mqtt_transport.DEFAULT_KEEPALIVE = DEFAULT_KEEPALIVE self._attach_connect_event_watcher() class ModuleConnect(object): async def connect_from_environment(self, transport_type): assert False async def create_from_environment(self, transport_type): kwargs = get_kwargs(transport_type) self.client = IoTHubModuleClient.create_from_edge_environment(**kwargs) mqtt_transport.DEFAULT_KEEPALIVE = DEFAULT_KEEPALIVE self._attach_connect_event_watcher() async def create_from_symmetric_key( self, transport_type, device_id, module_id, hostname, symmetric_key ): kwargs = get_kwargs(transport_type) self.client = self.client_class.create_from_symmetric_key( symmetric_key, hostname, device_id, module_id, **kwargs ) mqtt_transport.DEFAULT_KEEPALIVE = DEFAULT_KEEPALIVE self._attach_connect_event_watcher() class HandleMethods(object): async def enable_methods(self): def on_method_request_received(req): with self.lock: if req.name not in self.method_queues: self.method_queues[req.name] = queue.Queue() self.method_queues[req.name].put(req) self.client.on_method_request_received = on_method_request_received # Python SDK 3.x needs to make an additional invocation if hasattr(self.client, "start_method_request_receive"): await self.client.start_method_request_receive() async def wait_for_method_and_return_response(self, methodName, requestAndResponse): with self.lock: if methodName not in self.method_queues: self.method_queues[methodName] = queue.Queue() # receive method request logger.info("Waiting for method request") request = await asyncio.get_event_loop().run_in_executor( None, self.method_queues[methodName].get ) logger.info("Method request received") # verify name and payload expected_name = methodName expected_payload = requestAndResponse.request_payload["payload"] if request.name == expected_name: if request.payload == expected_payload: logger.info("Method name and payload matched. Returning response") resp_status = requestAndResponse.status_code resp_payload = requestAndResponse.response_payload else: logger.info("Request payload doesn't match") logger.info("expected: " + expected_payload) logger.info("received: " + request.payload) resp_status = 500 resp_payload = None else: logger.info("Method name doesn't match") logger.info("expected: '" + expected_name + "'") logger.info("received: '" + request.name + "'") resp_status = 404 resp_payload = None # send method response response = MethodResponse( request_id=request.request_id, status=resp_status, payload=resp_payload ) await self.client.send_method_response(response) logger.info("Method response sent") class Twin(object): async def enable_twin(self): def on_patch_received(patch): self.twin_patch_queue.put(patch) self.client.on_twin_desired_properties_patch_received = on_patch_received # Python SDK 3.x needs to make an additional invocation if hasattr(self.client, "start_twin_desired_properties_patch_receive"): await self.client.start_twin_desired_properties_patch_receive() async def wait_for_desired_property_patch(self): logger.info("Waiting for desired property patch") patch = await asyncio.get_event_loop().run_in_executor( None, self.twin_patch_queue.get ) logger.info("patch received") return {"desired": patch} async def get_twin(self): logger.info("getting twin") twin = await self.client.get_twin() logger.info("done getting twin") return twin async def send_twin_patch(self, props): logger.info("setting reported property patch") await self.client.patch_twin_reported_properties(props.to_dict()["reported"]) logger.info("done setting reported properties") class C2d(object): async def enable_c2d(self): def on_message_received(msg): self.c2d_queue.put(msg) self.client.on_message_received = on_message_received # Python SDK 3.x needs to make an additional invocation if hasattr(self.client, "start_message_receive"): await self.client.start_message_receive() async def wait_for_c2d_message(self): logger.info("Waiting for c2d message") message = await asyncio.get_event_loop().run_in_executor( None, self.c2d_queue.get ) logger.info("Message received") return convert.incoming_message_to_test_script_object(message) class Telemetry(object): async def send_event(self, event_body): logger.info("sending event") await self.client.send_message( convert.test_script_object_to_outgoing_message(event_body) ) logger.info("send confirmation received") class InputsAndOutputs(object): async def enable_input_messages(self): def on_message_received(msg): with self.lock: if msg.input_name not in self.input_queues: self.input_queues[msg.input_name] = queue.Queue() self.input_queues[msg.input_name].put(msg) self.client.on_message_received = on_message_received # Python SDK 3.x needs to make an additional invocation if hasattr(self.client, "start_message_receive"): await self.client.start_message_receive() async def wait_for_input_message(self, input_name): with self.lock: if input_name not in self.input_queues: self.input_queues[input_name] = queue.Queue() logger.info("Waiting for input message") message = await asyncio.get_event_loop().run_in_executor( None, self.input_queues[input_name].get ) logger.info("Message received") return convert.incoming_message_to_test_script_object(message) async def send_output_event(self, output_name, event_body): logger.info("sending output event") await self.client.send_message_to_output( convert.test_script_object_to_outgoing_message(event_body), output_name ) logger.info("send confirmation received") class InvokeMethods(object): async def invoke_module_method( self, device_id, module_id, method_invoke_parameters ): logger.info("Invoking a method on the module.") method_response = await self.client.invoke_method( device_id=device_id, module_id=module_id, method_params=method_invoke_parameters, ) logger.info("Method Invoked and response received.") return method_response async def invoke_device_method(self, device_id, method_invoke_parameters): logger.info("Invoking a method on the module.") method_response = await self.client.invoke_method( device_id=device_id, method_params=method_invoke_parameters ) logger.info("Method Invoked and response received.") return method_response class BlobUpload(object): async def get_storage_info_for_blob(self, blob_name): info = await self.client.get_storage_info_for_blob(blob_name) return info async def notify_blob_upload_status( self, correlation_id, is_success, status_code, status_description ): await self.client.notify_blob_upload_status( correlation_id, is_success, status_code, status_description ) class InternalDeviceGlueAsync( Connect, DeviceConnect, HandleMethods, C2d, Telemetry, Twin, BlobUpload ): def __init__(self): self.client = None self.client_class = IoTHubDeviceClient self.connected = False self.c2d_queue = queue.Queue() self.twin_patch_queue = queue.Queue() self.method_queues = {} self.lock = threading.Lock() class InternalModuleGlueAsync( Connect, ModuleConnect, HandleMethods, C2d, Twin, Telemetry, InputsAndOutputs, InvokeMethods, ): def __init__(self): self.client = None self.client_class = IoTHubModuleClient self.connected = False self.c2d_queue = queue.Queue() self.twin_patch_queue = queue.Queue() self.method_queues = {} self.lock = threading.Lock() self.input_queues = {}