docker_images/pythonv2/wrapper/python_glue/internal_iothub_glue.py (236 lines of code) (raw):
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for
# full license information.
import logging
import convert
import queue
import threading
from connection_status import ConnectionStatus
from azure.iot.device import IoTHubDeviceClient, IoTHubModuleClient, MethodResponse
logger = logging.getLogger(__name__)
DEFAULT_KEEPALIVE = 8
def get_kwargs(transport_type):
kwargs = {}
kwargs["keep_alive"] = DEFAULT_KEEPALIVE
if transport_type == "mqttws":
kwargs["websockets"] = True
return kwargs
class Connect(ConnectionStatus):
def connect_sync(self, transport_type, connection_string, cert):
assert False
def disconnect_sync(self):
assert False
def create_from_connection_string_sync(
self, transport_type, connection_string, cert
):
kwargs = get_kwargs(transport_type)
if "GatewayHostName" in connection_string:
self.client = self.client_class.create_from_connection_string(
connection_string, server_verification_cert=cert, **kwargs
)
else:
self.client = self.client_class.create_from_connection_string(
connection_string, **kwargs
)
self._attach_connect_event_watcher()
def create_from_x509_sync(self, transport_type, x509):
# BKTODO
pass
def connect2_sync(self):
self.client.connect()
def reconnect_sync(self, force_renew_password):
# BKTODO
pass
def disconnect2_sync(self):
# disconnect2 keeps the object around. We might use it again
self.client.disconnect()
def destroy_sync(self):
if self.client:
try:
if hasattr(self.client, "shutdown"):
self.client.shutdown()
else:
self.client.disconnect()
finally:
self.client = None
class DeviceConnect(object):
def create_from_symmetric_key_sync(
self, transport_type, device_id, hostname, symmetric_key
):
kwargs = get_kwargs(transport_type)
self.client = self.client_class.create_from_symmetric_key(
symmetric_key, hostname, device_id, **kwargs
)
self._attach_connect_event_watcher()
class ModuleConnect(object):
def connect_from_environment_sync(self, transport_type):
assert False
def create_from_environment_sync(self, transport_type):
kwargs = get_kwargs(transport_type)
self.client = self.client_class.create_from_edge_environment(**kwargs)
self._attach_connect_event_watcher()
def create_from_symmetric_key_sync(
self, transport_type, device_id, module_id, hostname, symmetric_key
):
kwargs = get_kwargs(transport_type)
self.client = self.client_class.create_from_symmetric_key(
symmetric_key, hostname, device_id, module_id, **kwargs
)
self._attach_connect_event_watcher()
class HandleMethods(object):
def enable_methods_sync(self):
def on_method_request_received(req):
with self.lock:
if req.name not in self.method_queues:
self.method_queues[req.name] = queue.Queue()
self.method_queues[req.name].put(req)
self.client.on_method_request_received = on_method_request_received
# Python SDK 3.x needs to make an additional invocation
if hasattr(self.client, "start_method_request_receive"):
self.client.start_method_request_receive()
def wait_for_method_and_return_response_sync(self, methodName, requestAndResponse):
with self.lock:
if methodName not in self.method_queues:
self.method_queues[methodName] = queue.Queue()
# receive method request
logger.info("Waiting for method request")
request = self.method_queues[methodName].get()
logger.info("Method request received")
# verify name and payload
expected_name = methodName
expected_payload = requestAndResponse.request_payload["payload"]
if request.name == expected_name:
if request.payload == expected_payload:
logger.info("Method name and payload matched. Returning response")
resp_status = requestAndResponse.status_code
resp_payload = requestAndResponse.response_payload
else:
logger.info("Request payload doesn't match")
logger.info("expected: " + expected_payload)
logger.info("received: " + request.payload)
resp_status = 500
resp_payload = None
else:
logger.info("Method name doesn't match")
logger.info("expected: '" + expected_name + "'")
logger.info("received: '" + request.name + "'")
resp_status = 404
resp_payload = None
# send method response
response = MethodResponse(
request_id=request.request_id, status=resp_status, payload=resp_payload
)
self.client.send_method_response(response)
logger.info("Method response sent")
class Twin(object):
def enable_twin_sync(self):
def on_patch_received(patch):
self.twin_patch_queue.put(patch)
self.client.on_twin_desired_properties_patch_received = on_patch_received
# Python SDK 3.x needs to make an additional invocation
if hasattr(self.client, "start_twin_desired_properties_patch_receive"):
self.client.start_twin_desired_properties_patch_receive()
def wait_for_desired_property_patch_sync(self):
logger.info("Waiting for desired property patch")
patch = self.twin_patch_queue.get()
logger.info("patch received")
logger.info(str(patch))
return {"desired": patch}
def get_twin_sync(self):
logger.info("getting twin")
twin = self.client.get_twin()
logger.info("done getting twin")
return twin
def send_twin_patch_sync(self, twin):
logger.info("setting reported property patch")
self.client.patch_twin_reported_properties(twin.reported)
logger.info("done setting reported properties")
class C2d(object):
def enable_c2d_sync(self):
def on_message_received(msg):
self.c2d_queue.put(msg)
self.client.on_message_received = on_message_received
# Python SDK 3.x needs to make an additional invocation
if hasattr(self.client, "start_message_receive"):
self.client.start_message_receive()
def wait_for_c2d_message_sync(self):
logger.info("Waiting for c2d message")
message = self.c2d_queue.get()
logger.info("Message received")
return convert.incoming_message_to_test_script_object(message)
class Telemetry(object):
def send_event_sync(self, event_body):
self.client.send_message(
convert.test_script_object_to_outgoing_message(event_body)
)
class InputsAndOutputs(object):
def enable_input_messages_sync(self):
def on_message_received(msg):
with self.lock:
if msg.input_name not in self.input_queues:
self.input_queues[msg.input_name] = queue.Queue()
self.input_queues[msg.input_name].put(msg)
self.client.on_message_received = on_message_received
# Python SDK 3.x needs to make an additional invocation
if hasattr(self.client, "start_message_receive"):
self.client.start_message_receive()
def wait_for_input_message_sync(self, input_name):
with self.lock:
if input_name not in self.input_queues:
self.input_queues[input_name] = queue.Queue()
logger.info("Waiting for input message")
message = self.input_queues[input_name].get()
logger.info("Message received")
logger.info(str(message))
converted = convert.incoming_message_to_test_script_object(message)
logger.info("Converted to:")
logger.info(str(converted))
logger.info("---")
return converted
def send_output_event_sync(self, output_name, event_body):
logger.info("sending output event")
self.client.send_message_to_output(
convert.test_script_object_to_outgoing_message(event_body), output_name
)
logger.info("send confirmation received")
class InvokeMethods(object):
def invoke_module_method_sync(self, device_id, module_id, method_invoke_parameters):
logger.info("Invoking a method on the module.")
method_response = self.client.invoke_method(
device_id=device_id,
module_id=module_id,
method_params=method_invoke_parameters,
)
logger.info("Method Invoked and response received.")
return method_response
def invoke_device_method_sync(self, device_id, method_invoke_parameters):
logger.info("Invoking a method on the module.")
method_response = self.client.invoke_method(
device_id=device_id, method_params=method_invoke_parameters
)
logger.info("Method Invoked and response received.")
return method_response
class BlobUpload(object):
def get_storage_info_for_blob_sync(self, blob_name):
return self.client.get_storage_info_for_blob(blob_name)
def notify_blob_upload_status_sync(
self, correlation_id, is_success, status_code, status_description
):
self.client.notify_blob_upload_status(
correlation_id, is_success, status_code, status_description
)
class InternalDeviceGlueSync(
Connect, DeviceConnect, HandleMethods, C2d, Twin, Telemetry, BlobUpload
):
def __init__(self):
self.client_class = IoTHubDeviceClient
self.client = None
self.c2d_queue = queue.Queue()
self.twin_patch_queue = queue.Queue()
self.method_queues = {}
self.lock = threading.Lock()
class InternalModuleGlueSync(
Connect,
ModuleConnect,
HandleMethods,
C2d,
Twin,
Telemetry,
InputsAndOutputs,
InvokeMethods,
):
def __init__(self):
self.client_class = IoTHubModuleClient
self.client = None
self.c2d_queue = queue.Queue()
self.twin_patch_queue = queue.Queue()
self.method_queues = {}
self.lock = threading.Lock()
self.input_queues = {}