uamqp/async_ops/receiver_async.py (59 lines of code) (raw):
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------
import logging
import asyncio
from uamqp import constants, errors, receiver
from uamqp.async_ops.utils import get_dict_with_loop_if_needed
_logger = logging.getLogger(__name__)
class MessageReceiverAsync(receiver.MessageReceiver):
"""An asynchronous Message Receiver that opens its own exclsuive Link on an
existing Session.
:ivar receive_settle_mode: The mode by which to settle message receive
operations. If set to `PeekLock`, the receiver will lock a message once received until
the client accepts or rejects the message. If set to `ReceiveAndDelete`, the service
will assume successful receipt of the message and clear it from the queue. The
default is `PeekLock`.
:vartype receive_settle_mode: ~uamqp.constants.ReceiverSettleMode
:ivar send_settle_mode: The mode by which to settle message send
operations. If set to `Unsettled`, the client will wait for a confirmation
from the service that the message was successfully sent. If set to 'Settled',
the client will not wait for confirmation and assume success.
:vartype send_settle_mode: ~uamqp.constants.SenderSettleMode
:ivar max_message_size: The maximum allowed message size negotiated for the Link.
:vartype max_message_size: int
:param session: The underlying Session with which to receive.
:type session: ~uamqp.session.Session
:param source: The AMQP endpoint to receive from.
:type source: ~uamqp.address.Source
:param target: The name of target (i.e. the client).
:type target: str or bytes
:param name: A unique name for the receiver. If not specified a GUID will be used.
:type name: str or bytes
:param receive_settle_mode: The mode by which to settle message receive
operations. If set to `PeekLock`, the receiver will lock a message once received until
the client accepts or rejects the message. If set to `ReceiveAndDelete`, the service
will assume successful receipt of the message and clear it from the queue. The
default is `PeekLock`.
:type receive_settle_mode: ~uamqp.constants.ReceiverSettleMode
:param send_settle_mode: The mode by which to settle message send
operations. If set to `Unsettled`, the client will wait for a confirmation
from the service that the message was successfully sent. If set to 'Settled',
the client will not wait for confirmation and assume success.
:type send_settle_mode: ~uamqp.constants.SenderSettleMode
:param desired_capabilities: The extension capabilities desired from the peer endpoint.
To create a desired_capabilities object, please do as follows:
- 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]`
- 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))`
:type desired_capabilities: ~uamqp.c_uamqp.AMQPValue
:param max_message_size: The maximum allowed message size negotiated for the Link.
:type max_message_size: int
:param prefetch: The receiver Link credit that determines how many
messages the Link will attempt to handle per connection iteration.
:type prefetch: int
:param properties: Metadata to be sent in the Link ATTACH frame.
:type properties: dict
:param error_policy: A policy for parsing errors on link, connection and message
disposition to determine whether the error should be retryable.
:type error_policy: ~uamqp.errors.ErrorPolicy
:param debug: Whether to turn on network trace logs. If `True`, trace logs
will be logged at INFO level. Default is `False`.
:type debug: bool
:param encoding: The encoding to use for parameters supplied as strings.
Default is 'UTF-8'
:type encoding: str
"""
def __init__(self, session, source, target,
on_message_received,
name=None,
receive_settle_mode=constants.ReceiverSettleMode.PeekLock,
send_settle_mode=constants.SenderSettleMode.Unsettled,
max_message_size=constants.MAX_MESSAGE_LENGTH_BYTES,
prefetch=300,
properties=None,
error_policy=None,
debug=False,
encoding='UTF-8',
desired_capabilities=None,
loop=None):
self._internal_kwargs = get_dict_with_loop_if_needed(loop)
super(MessageReceiverAsync, self).__init__(
session, source, target,
on_message_received,
name=name,
receive_settle_mode=receive_settle_mode,
send_settle_mode=send_settle_mode,
max_message_size=max_message_size,
prefetch=prefetch,
properties=properties,
error_policy=error_policy,
debug=debug,
encoding=encoding,
desired_capabilities=desired_capabilities)
async def __aenter__(self):
"""Open the MessageReceiver in an async context manager."""
await self.open_async()
return self
async def __aexit__(self, *args):
"""Close the MessageReceiver when exiting an async context manager."""
await self.destroy_async()
@property
def loop(self):
return self._internal_kwargs.get("loop")
async def destroy_async(self):
"""Asynchronously close both the Receiver and the Link. Clean up any C objects."""
self.destroy()
async def open_async(self):
"""Asynchronously open the MessageReceiver in order to start
processing messages.
:raises: ~uamqp.errors.AMQPConnectionError if the Receiver raises
an error on opening. This can happen if the source URI is invalid
or the credentials are rejected.
"""
try:
self._receiver.open(self)
except ValueError:
raise errors.AMQPConnectionError(
"Failed to open Message Receiver. "
"Please confirm credentials and target URI.")
async def work_async(self):
"""Update the link status."""
await asyncio.sleep(0, **self._internal_kwargs)
self._link.do_work()
async def reset_link_credit_async(self, link_credit, **kwargs):
"""Asynchronously reset the link credit. This method would send flow control frame to the sender.
:param link_credit: The link credit amount that is requested.
:type link_credit: int
"""
await asyncio.sleep(0, **self._internal_kwargs)
drain = kwargs.get("drain", False)
self._link.reset_link_credit(link_credit, drain)
async def close_async(self):
"""Close the Receiver asynchronously, leaving the link intact."""
self.close()