uamqp/receiver.py (190 lines of code) (raw):

#------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for # license information. #-------------------------------------------------------------------------- import functools import logging import uuid import uamqp from uamqp import c_uamqp, constants, errors, utils _logger = logging.getLogger(__name__) class MessageReceiver(object): """A Message Receiver that opens its own exclsuive Link on an existing Session. :ivar receive_settle_mode: The mode by which to settle message receive operations. If set to `PeekLock`, the receiver will lock a message once received until the client accepts or rejects the message. If set to `ReceiveAndDelete`, the service will assume successful receipt of the message and clear it from the queue. The default is `PeekLock`. :vartype receive_settle_mode: ~uamqp.constants.ReceiverSettleMode :ivar send_settle_mode: The mode by which to settle message send operations. If set to `Unsettled`, the client will wait for a confirmation from the service that the message was successfully sent. If set to 'Settled', the client will not wait for confirmation and assume success. :vartype send_settle_mode: ~uamqp.constants.SenderSettleMode :ivar max_message_size: The maximum allowed message size negotiated for the Link. :vartype max_message_size: int :param session: The underlying Session with which to receive. :type session: ~uamqp.session.Session :param source: The AMQP endpoint to receive from. :type source: ~uamqp.address.Source :param target: The name of target (i.e. the client). :type target: str or bytes :param name: A unique name for the receiver. If not specified a GUID will be used. :type name: str or bytes :param receive_settle_mode: The mode by which to settle message receive operations. If set to `PeekLock`, the receiver will lock a message once received until the client accepts or rejects the message. If set to `ReceiveAndDelete`, the service will assume successful receipt of the message and clear it from the queue. The default is `PeekLock`. :type receive_settle_mode: ~uamqp.constants.ReceiverSettleMode :param send_settle_mode: The mode by which to settle message send operations. If set to `Unsettled`, the client will wait for a confirmation from the service that the message was successfully sent. If set to 'Settled', the client will not wait for confirmation and assume success. :type send_settle_mode: ~uamqp.constants.SenderSettleMode :param desired_capabilities: The extension capabilities desired from the peer endpoint. To create a desired_capabilities object, please do as follows: - 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]` - 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))` :type desired_capabilities: ~uamqp.c_uamqp.AMQPValue :param max_message_size: The maximum allowed message size negotiated for the Link. :type max_message_size: int :param prefetch: The receiver Link credit that determines how many messages the Link will attempt to handle per connection iteration. :type prefetch: int :param properties: Metadata to be sent in the Link ATTACH frame. :type properties: dict :param error_policy: A policy for parsing errors on link, connection and message disposition to determine whether the error should be retryable. :type error_policy: ~uamqp.errors.ErrorPolicy :param debug: Whether to turn on network trace logs. If `True`, trace logs will be logged at INFO level. Default is `False`. :type debug: bool :param encoding: The encoding to use for parameters supplied as strings. Default is 'UTF-8' :type encoding: str """ def __init__(self, session, source, target, on_message_received, name=None, receive_settle_mode=constants.ReceiverSettleMode.PeekLock, send_settle_mode=constants.SenderSettleMode.Unsettled, max_message_size=constants.MAX_MESSAGE_LENGTH_BYTES, prefetch=300, properties=None, error_policy=None, debug=False, encoding='UTF-8', desired_capabilities=None): # pylint: disable=protected-access if name: self.name = name.encode(encoding) if isinstance(name, str) else name else: self.name = str(uuid.uuid4()).encode(encoding) target = target.encode(encoding) if isinstance(target, str) else target role = constants.Role.Receiver self.source = source._address.value self.target = c_uamqp.Messaging.create_target(target) self.on_message_received = on_message_received self.encoding = encoding self.error_policy = error_policy or errors.ErrorPolicy() self._settle_mode = receive_settle_mode self._conn = session._conn self._session = session self._link = c_uamqp.create_link(session._session, self.name, role.value, self.source, self.target) self._link.subscribe_to_detach_event(self) if prefetch is not None: self._link.set_prefetch_count(prefetch) if properties: self._link.set_attach_properties(utils.data_factory(properties, encoding=encoding)) if receive_settle_mode: self.receive_settle_mode = receive_settle_mode if send_settle_mode: self.send_settle_mode = send_settle_mode if max_message_size: self.max_message_size = max_message_size if desired_capabilities: self._link.set_desired_capabilities(desired_capabilities) self._receiver = c_uamqp.create_message_receiver(self._link, self) self._receiver.set_trace(debug) self._state = constants.MessageReceiverState.Idle self._error = None def __enter__(self): """Open the MessageReceiver in a context manager.""" self.open() return self def __exit__(self, *args): """Close the MessageReceiver when exiting a context manager.""" self.destroy() def _state_changed(self, previous_state, new_state): """Callback called whenever the underlying Receiver undergoes a change of state. This function wraps the states as Enums to prepare for calling the public callback. :param previous_state: The previous Receiver state. :type previous_state: int :param new_state: The new Receiver state. :type new_state: int """ try: try: _previous_state = constants.MessageReceiverState(previous_state) except ValueError: _previous_state = previous_state try: _new_state = constants.MessageReceiverState(new_state) except ValueError: _new_state = new_state if _previous_state == constants.MessageReceiverState.Opening \ and _new_state == constants.MessageReceiverState.Error: _logger.info("Receiver link failed to open - expecting to receive DETACH frame.") elif self._session._link_error: # pylint: disable=protected-access _logger.info("Receiver link ATTACH frame invalid - expecting to receive DETACH frame.") else: self.on_state_changed(_previous_state, _new_state) except KeyboardInterrupt: _logger.error("Received shutdown signal while updating receiver state from {} to {}".format( previous_state, new_state)) self._error = errors.AMQPClientShutdown() def _detach_received(self, error): """Callback called when a link DETACH frame is received. This callback will process the received DETACH error to determine if the link is recoverable or whether it should be shutdown. :param error: The error information from the detach frame. :type error: ~uamqp.errors.ErrorResponse """ # pylint: disable=protected-access if error: condition = error.condition description = error.description info = error.info else: condition = b"amqp:unknown-error" description = None info = None self._error = errors._process_link_error(self.error_policy, condition, description, info) _logger.info("Received Link detach event: %r\nLink: %r\nDescription: %r" "\nDetails: %r\nRetryable: %r\nConnection: %r", condition, self.name, description, info, self._error.action.retry, self._session._connection.container_id) def _settle_message(self, message_number, response): """Send a settle dispostition for a received message. :param message_number: The delivery number of the message to settle. :type message_number: int :response: The type of disposition to respond with, e.g. whether the message was accepted, rejected or abandoned. :type response: ~uamqp.errors.MessageResponse """ if not response or isinstance(response, errors.MessageAlreadySettled): return if isinstance(response, errors.MessageAccepted): self._receiver.settle_accepted_message(message_number) elif isinstance(response, errors.MessageReleased): self._receiver.settle_released_message(message_number) elif isinstance(response, errors.MessageRejected): self._receiver.settle_rejected_message( message_number, response.error_condition, response.error_description, response.error_info) elif isinstance(response, errors.MessageModified): self._receiver.settle_modified_message( message_number, response.failed, response.undeliverable, response.annotations) else: raise ValueError("Invalid message response type: {}".format(response)) def _message_received(self, message): """Callback run on receipt of every message. If there is a user-defined callback, this will be called. Additionally if the client is retrieving messages for a batch or iterator, the message will be added to an internal queue. :param message: c_uamqp.Message """ # pylint: disable=protected-access message_number = self._receiver.last_received_message_number() if self._settle_mode == constants.ReceiverSettleMode.ReceiveAndDelete: settler = None else: settler = functools.partial(self._settle_message, message_number) try: wrapped_message = uamqp.Message( message=message, encoding=self.encoding, settler=settler, delivery_no=message_number) self.on_message_received(wrapped_message) except RuntimeError: condition = b"amqp:unknown-error" self._error = errors._process_link_error(self.error_policy, condition, None, None) _logger.info("Unable to settle message no %r. Disconnecting.\nLink: %r\nConnection: %r", message_number, self.name, self._session._connection.container_id) except KeyboardInterrupt: _logger.error("Received shutdown signal while processing message no %r\nRejecting message.", message_number) self._receiver.settle_modified_message(message_number, True, True, None) self._error = errors.AMQPClientShutdown() except Exception as e: # pylint: disable=broad-except _logger.error("Error processing message no %r: %r\nRejecting message.", message_number, e) self._receiver.settle_modified_message(message_number, True, True, None) def get_state(self): """Get the state of the MessageReceiver and its underlying Link. :rtype: ~uamqp.constants.MessageReceiverState """ try: raise self._error except TypeError: pass except errors.LinkRedirect as e: _logger.info("%r", e) raise except Exception as e: _logger.warning("%r", e) raise return self._state def work(self): """Update the link status.""" self._link.do_work() def reset_link_credit(self, link_credit, **kwargs): """Reset the link credit. This method would send flow control frame to the sender. :param link_credit: The link credit amount that is requested. :type link_credit: int """ drain = kwargs.get("drain", False) self._link.reset_link_credit(link_credit, drain) def destroy(self): """Close both the Receiver and the Link. Clean up any C objects.""" self._receiver.destroy() self._link.destroy() def open(self): """Open the MessageReceiver in order to start processing messages. :raises: ~uamqp.errors.AMQPConnectionError if the Receiver raises an error on opening. This can happen if the source URI is invalid or the credentials are rejected. """ try: self._receiver.open(self) except ValueError: raise errors.AMQPConnectionError( "Failed to open Message Receiver. " "Please confirm credentials and target URI.") def close(self): """Close the Receiver, leaving the link intact.""" self._receiver.close() def on_state_changed(self, previous_state, new_state): """Callback called whenever the underlying Receiver undergoes a change of state. This function can be overridden. :param previous_state: The previous Receiver state. :type previous_state: ~uamqp.constants.MessageReceiverState :param new_state: The new Receiver state. :type new_state: ~uamqp.constants.MessageReceiverState """ # pylint: disable=protected-access _logger.info("Message receiver %r state changed from %r to %r on connection: %r", self.name, previous_state, new_state, self._session._connection.container_id) self._state = new_state @property def receive_settle_mode(self): return self._link.receive_settle_mode @receive_settle_mode.setter def receive_settle_mode(self, value): self._link.receive_settle_mode = value.value @property def send_settle_mode(self): return self._link.send_settle_mode @send_settle_mode.setter def send_settle_mode(self, value): self._link.send_settle_mode = value.value @property def max_message_size(self): return self._link.max_message_size @max_message_size.setter def max_message_size(self, value): self._link.max_message_size = int(value)