docker_images/pythonv2/wrapper/python_glue/device_glue.py (94 lines of code) (raw):

# Copyright (c) Microsoft. All rights reserved. # Licensed under the MIT license. See LICENSE file in the project root for # full license information. from swagger_server.models.connect_response import ConnectResponse from swagger_server.models.event_body import EventBody from internal_glue_factory import create_glue_object import json import logging logger = logging.getLogger(__name__) class DeviceGlue: object_count = 1 object_map = {} def _finish_connection(self, internal): connection_id = "deviceObject_" + str(self.object_count) self.object_count += 1 self.object_map[connection_id] = internal return ConnectResponse(connection_id) def connect_sync(self, transport_type, connection_string, ca_certificate): internal = create_glue_object("device", "sync_interface") internal.connect_sync(transport_type, connection_string, ca_certificate.cert) return self._finish_connection(internal) def disconnect_sync(self, connection_id): logger.info("disconnecting " + connection_id) if connection_id in self.object_map: internal = self.object_map[connection_id] internal.disconnect_sync() del self.object_map[connection_id] def create_from_connection_string_sync( self, transport_type, connection_string, ca_certificate ): internal = create_glue_object("device", "sync_interface") internal.create_from_connection_string_sync( transport_type, connection_string, ca_certificate.cert ) return self._finish_connection(internal) def create_from_x509_sync(self, transport_type, x509): internal = create_glue_object("device", "sync_interface") internal.create_from_x509_sync(transport_type, x509) return self._finish_connection(internal) def create_from_environment_sync(self, transport_type): internal = create_glue_object("device", "sync_interface") internal.create_from_environment_sync(transport_type) return self._finish_connection(internal) def connect2_sync(self, connection_id): self.object_map[connection_id].connect2_sync() def reconnect_sync(self, connection_id, force_renew_password): self.object_map[connection_id].reconnect_sync(force_renew_password) def disconnect2_sync(self, connection_id): self.object_map[connection_id].disconnect2_sync() def destroy_sync(self, connection_id): logger.info("destroying " + connection_id) if connection_id in self.object_map: internal = self.object_map[connection_id] internal.destroy_sync() del self.object_map[connection_id] def enable_methods_sync(self, connection_id): self.object_map[connection_id].enable_methods_sync() def enable_c2d_sync(self, connection_id): self.object_map[connection_id].enable_c2d_sync() def send_event_sync(self, connection_id, event_body): self.object_map[connection_id].send_event_sync(event_body) def wait_for_c2d_message_sync(self, connection_id): response = self.object_map[connection_id].wait_for_c2d_message_sync() return EventBody.from_dict(response) def wait_for_method_and_return_response_sync( self, connection_id, method_name, request_and_response ): self.object_map[connection_id].wait_for_method_and_return_response_sync( method_name, request_and_response ) def wait_for_desired_property_patch_sync(self, connection_id): return self.object_map[connection_id].wait_for_desired_property_patch_sync() def enable_twin_sync(self, connection_id): self.object_map[connection_id].enable_twin_sync() def get_twin_sync(self, connection_id): return self.object_map[connection_id].get_twin_sync() def send_twin_patch_sync(self, connection_id, props): self.object_map[connection_id].send_twin_patch_sync(props) def get_connection_status_sync(self, connection_id): return self.object_map[connection_id].get_connection_status_sync() def wait_for_connection_status_change_sync(self, connection_id, connection_status): return self.object_map[connection_id].wait_for_connection_status_change_sync( connection_status ) def cleanup_resources_sync(self): listcopy = list(self.object_map.keys()) for key in listcopy: logger.info("object {} not cleaned up".format(key)) self.disconnect_sync(key) def get_storage_info_for_blob_sync(self, connection_id, blob_name): return self.object_map[connection_id].get_storage_info_for_blob_sync(blob_name) def notify_blob_upload_status_sync( self, connection_id, correlation_id, is_success, status_code, status_description ): return self.object_map[connection_id].notify_blob_upload_status_sync( correlation_id, is_success, status_code, status_description )