in uamqp/receiver.py [0:0]
def _message_received(self, message):
"""Callback run on receipt of every message. If there is
a user-defined callback, this will be called.
Additionally if the client is retrieving messages for a batch
or iterator, the message will be added to an internal queue.
:param message: c_uamqp.Message
"""
# pylint: disable=protected-access
message_number = self._receiver.last_received_message_number()
if self._settle_mode == constants.ReceiverSettleMode.ReceiveAndDelete:
settler = None
else:
settler = functools.partial(self._settle_message, message_number)
try:
wrapped_message = uamqp.Message(
message=message,
encoding=self.encoding,
settler=settler,
delivery_no=message_number)
self.on_message_received(wrapped_message)
except RuntimeError:
condition = b"amqp:unknown-error"
self._error = errors._process_link_error(self.error_policy, condition, None, None)
_logger.info("Unable to settle message no %r. Disconnecting.\nLink: %r\nConnection: %r",
message_number,
self.name,
self._session._connection.container_id)
except KeyboardInterrupt:
_logger.error("Received shutdown signal while processing message no %r\nRejecting message.", message_number)
self._receiver.settle_modified_message(message_number, True, True, None)
self._error = errors.AMQPClientShutdown()
except Exception as e: # pylint: disable=broad-except
_logger.error("Error processing message no %r: %r\nRejecting message.", message_number, e)
self._receiver.settle_modified_message(message_number, True, True, None)