test-runner/adapters/direct_azure_rest/amqp_service_client.py (69 lines of code) (raw):

# Copyright (c) Microsoft. All rights reserved. # Licensed under the MIT license. See LICENSE file in the project root for # full license information. import logging from uuid import uuid4 import json from .. import adapter_config from urllib.parse import quote_plus from connection_string import connection_string_to_dictionary, generate_auth_token import uamqp logger = logging.getLogger(__name__) def _build_iothub_amqp_endpoint(config): hub_name = config["HostName"].split(".")[0] endpoint = "{}@sas.root.{}".format(config["SharedAccessKeyName"], hub_name) endpoint = quote_plus(endpoint) sas_token = generate_auth_token( config["HostName"], config["SharedAccessKeyName"], config["SharedAccessKey"] + "=", ) endpoint = endpoint + ":{}@{}".format(quote_plus(sas_token), config["HostName"]) return endpoint class AmqpServiceClient: async def connect(self, service_connection_string): self.config = connection_string_to_dictionary(service_connection_string) self.endpoint = _build_iothub_amqp_endpoint(self.config) send_operation = "/messages/devicebound" send_target = "amqps://" + self.endpoint + send_operation logger.info("send target: {}".format(send_target)) self.send_client = uamqp.async_ops.client_async.SendClientAsync( send_target, debug=True ) self.blob_status_receive_client = None self.blob_status_receive_iter = None adapter_config.logger("AMQP service client connected") async def disconnect(self): if self.send_client: await self.send_client.close_async() self.send_client = None adapter_config.logger("AMQP send client disconnected") if self.blob_status_receive_client: await self.blob_status_receive_client.close_async() self.blob_status_receive_client = None adapter_config.logger("AMQP blob status receive client disconnected") async def send_to_device(self, device_id, message): msg_content = json.dumps(message) app_properties = {} msg_props = uamqp.message.MessageProperties() msg_props.to = "/devices/{}/messages/devicebound".format(device_id) msg_props.message_id = str(uuid4()) amqp_message = uamqp.Message( msg_content, properties=msg_props, application_properties=app_properties ) await self.send_client.send_message_async(amqp_message) adapter_config.logger("AMQP service client sent: {}".format(message)) async def get_next_blob_status(self): if not self.blob_status_receive_client: blob_status_receive_operation = "/messages/serviceBound/filenotifications" blob_status_receive_source = ( "amqps://" + self.endpoint + blob_status_receive_operation ) logger.info( "blob status receive source: {}".format(blob_status_receive_source) ) self.blob_status_receive_client = uamqp.async_ops.client_async.ReceiveClientAsync( blob_status_receive_source ) self.blob_status_receive_iter = ( self.blob_status_receive_client.receive_messages_iter_async() ) async for message in self.blob_status_receive_iter: return message