test-runner/adapters/direct_azure_rest/direct_service_api.py (41 lines of code) (raw):
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for
# full license information.
from ..abstract_iothub_apis import AbstractServiceApi
from ..decorators import emulate_async
from .amqp_service_client import AmqpServiceClient
from azure.iot.hub import IoTHubRegistryManager, models
class ServiceApi(AbstractServiceApi):
def __init__(self):
self.service_connection_string = None
self.amqp_service_client = None
self.registry_manager = None
async def connect(self, service_connection_string):
self.service_connection_string = service_connection_string
self.registry_manager = IoTHubRegistryManager.from_connection_string(
service_connection_string
)
async def disconnect(self):
if self.amqp_service_client:
await self.amqp_service_client.disconnect()
self.amqp_serice_client = None
self.registry_manager = None
@emulate_async
def call_module_method(self, device_id, module_id, method_invoke_parameters):
return self.registry_manager.invoke_device_module_method(
device_id,
module_id,
models.CloudToDeviceMethod.from_dict(method_invoke_parameters),
).as_dict()
@emulate_async
def call_device_method(self, device_id, method_invoke_parameters):
return self.registry_manager.invoke_device_method(
device_id, models.CloudToDeviceMethod.from_dict(method_invoke_parameters)
).as_dict()
async def send_c2d(self, device_id, message):
if not self.amqp_service_client:
self.amqp_service_client = AmqpServiceClient()
await self.amqp_service_client.connect(self.service_connection_string)
await self.amqp_service_client.send_to_device(device_id, message)
async def get_blob_upload_status(self):
if not self.amqp_service_client:
self.amqp_service_client = AmqpServiceClient()
await self.amqp_service_client.connect(self.service_connection_string)
return await self.amqp_service_client.get_next_blob_status()