uamqp/__init__.py (61 lines of code) (raw):

#------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for # license information. #-------------------------------------------------------------------------- # pylint: disable=no-member import logging import sys from uamqp import c_uamqp # pylint: disable=import-self from uamqp.message import Message, BatchMessage from uamqp.address import Source, Target from uamqp.connection import Connection from uamqp.session import Session from uamqp.client import AMQPClient, SendClient, ReceiveClient from uamqp.sender import MessageSender from uamqp.receiver import MessageReceiver from uamqp.constants import TransportType, MessageBodyType try: from uamqp.async_ops import ConnectionAsync from uamqp.async_ops import SessionAsync from uamqp.async_ops import MessageSenderAsync from uamqp.async_ops import MessageReceiverAsync from uamqp.async_ops.client_async import ( AMQPClientAsync, SendClientAsync, ReceiveClientAsync, AsyncMessageIter) except (SyntaxError, ImportError): pass # Async not supported. __version__ = "1.6.11" _logger = logging.getLogger(__name__) _is_win = sys.platform.startswith('win') c_uamqp.set_python_logger() def send_message(target, data, auth=None, debug=False): """Send a single message to AMQP endpoint. :param target: The target AMQP endpoint. :type target: str, bytes or ~uamqp.address.Target :param data: The contents of the message to send. :type data: str, bytes or ~uamqp.message.Message :param auth: The authentication credentials for the endpoint. This should be one of the subclasses of uamqp.authentication.AMQPAuth. Currently this includes: - uamqp.authentication.SASLAnonymous - uamqp.authentication.SASLPlain - uamqp.authentication.SASTokenAuth If no authentication is supplied, SASLAnnoymous will be used by default. :type auth: ~uamqp.authentication.common.AMQPAuth :param debug: Whether to turn on network trace logs. If `True`, trace logs will be logged at INFO level. Default is `False`. :type debug: bool :return: A list of states for each message sent. :rtype: list[~uamqp.constants.MessageState] """ message = data if isinstance(data, Message) else Message(body=data) with SendClient(target, auth=auth, debug=debug) as send_client: send_client.queue_message(message) # pylint: disable=no-member return send_client.send_all_messages() # pylint: disable=no-member def receive_message(source, auth=None, timeout=0, debug=False): """Receive a single message from an AMQP endpoint. :param source: The AMQP source endpoint to receive from. :type source: str, bytes or ~uamqp.address.Source :param auth: The authentication credentials for the endpoint. This should be one of the subclasses of uamqp.authentication.AMQPAuth. Currently this includes: - uamqp.authentication.SASLAnonymous - uamqp.authentication.SASLPlain - uamqp.authentication.SASTokenAuth If no authentication is supplied, SASLAnnoymous will be used by default. :type auth: ~uamqp.authentication.common.AMQPAuth :param timeout: The timeout in milliseconds after which to return None if no messages are retrieved. If set to `0` (the default), the receiver will not timeout and will continue to wait for messages until interrupted. :param debug: Whether to turn on network trace logs. If `True`, trace logs will be logged at INFO level. Default is `False`. :type debug: bool :rtype: ~uamqp.message.Message or None """ received = receive_messages(source, auth=auth, max_batch_size=1, timeout=timeout, debug=debug) if received: return received[0] return None def receive_messages(source, auth=None, max_batch_size=None, timeout=0, debug=False, **kwargs): """Receive a batch of messages from an AMQP endpoint. :param source: The AMQP source endpoint to receive from. :type source: str, bytes or ~uamqp.address.Source :param auth: The authentication credentials for the endpoint. This should be one of the subclasses of ~uamqp.authentication.AMQPAuth. Currently this includes: - uamqp.authentication.SASLAnonymous - uamqp.authentication.SASLPlain - uamqp.authentication.SASTokenAuth If no authentication is supplied, SASLAnnoymous will be used by default. :type auth: ~uamqp.authentication.common.AMQPAuth :param max_batch_size: The maximum number of messages to return in a batch. If the receiver receives a smaller number than this, it will not wait to return them so the actual number returned can be anything up to this value. If the receiver reaches a timeout, an empty list will be returned. :param timeout: The timeout in milliseconds after which to return if no messages are retrieved. If set to `0` (the default), the receiver will not timeout and will continue to wait for messages until interrupted. :param debug: Whether to turn on network trace logs. If `True`, trace logs will be logged at INFO level. Default is `False`. :type debug: bool :rtype: list[~uamqp.message.Message] """ if max_batch_size: kwargs['prefetch'] = max_batch_size with ReceiveClient(source, auth=auth, debug=debug, **kwargs) as receive_client: return receive_client.receive_message_batch( # pylint: disable=no-member max_batch_size=max_batch_size or receive_client._prefetch, timeout=timeout) # pylint: disable=protected-access, no-member class _Platform(object): """Runs any platform preparatory steps for the AMQP C library. This is primarily used for OpenSSL setup. :ivar initialized: When the setup has completed. :vartype initialized: bool """ initialized = False @classmethod def initialize(cls): """Initialize the TLS/SSL platform to prepare it for making AMQP requests. This only needs to happen once. """ if cls.initialized: _logger.debug("Platform already initialized.") else: _logger.debug("Initializing platform.") c_uamqp.platform_init() cls.initialized = True @classmethod def deinitialize(cls): """Deinitialize the TLS/SSL platform to prepare it for making AMQP requests. This only needs to happen once. """ if not cls.initialized: _logger.debug("Platform already deinitialized.") else: #cls.initialized = False _logger.debug("Deinitializing platform.") #c_uamqp.platform_deinit() def get_platform_info(): """Gets the current platform information. :rtype: str """ return str(c_uamqp.get_info())