test-runner/adapters/direct_azure_rest/direct_registry_api.py (28 lines of code) (raw):

# Copyright (c) Microsoft. All rights reserved. # Licensed under the MIT license. See LICENSE file in the project root for # full license information. from ..abstract_iothub_apis import AbstractRegistryApi from ..decorators import emulate_async from azure.iot.hub import IoTHubRegistryManager, models class RegistryApi(AbstractRegistryApi): def __init__(self): self.registry_manager = None async def connect(self, service_connection_string): self.registry_manager = IoTHubRegistryManager.from_connection_string( service_connection_string ) async def disconnect(self): pass @emulate_async def get_module_twin(self, device_id, module_id): return self.registry_manager.get_module_twin(device_id, module_id).as_dict()[ "properties" ] @emulate_async def patch_module_twin(self, device_id, module_id, patch): twin = models.Twin.from_dict({"properties": patch}) self.registry_manager.update_module_twin(device_id, module_id, twin) @emulate_async def get_device_twin(self, device_id): return self.registry_manager.get_twin(device_id).as_dict()["properties"] @emulate_async def patch_device_twin(self, device_id, patch): twin = models.Twin.from_dict({"properties": patch}) self.registry_manager.update_twin(device_id, twin)