in uamqp/client.py [0:0]
def _on_message_sent(self, message, result, delivery_state=None):
"""Callback run on a message send operation. If message
has a user defined callback, it will be called here. If the result
of the operation is failure, the message state will be reverted
to 'pending' up to the maximum retry count.
:param message: The message that was sent.
:type message: ~uamqp.message.Message
:param result: The result of the send operation.
:type result: int
:param error: An Exception if an error ocurred during the send operation.
:type error: ~Exception
"""
# pylint: disable=protected-access
try:
exception = delivery_state
result = constants.MessageSendResult(result)
if result == constants.MessageSendResult.Error:
if isinstance(delivery_state, Exception):
exception = errors.ClientMessageError(delivery_state, info=delivery_state)
exception.action = errors.ErrorAction(retry=True)
elif delivery_state:
error = errors.ErrorResponse(delivery_state)
exception = errors._process_send_error(
self._error_policy,
error.condition,
error.description,
error.info)
else:
exception = errors.MessageSendFailed(constants.ErrorCodes.UnknownError)
exception.action = errors.ErrorAction(retry=True)
if exception.action.retry == errors.ErrorAction.retry \
and message.retries < self._error_policy.max_retries:
if exception.action.increment_retries:
message.retries += 1
self._backoff = exception.action.backoff
_logger.debug("Message error, retrying. Attempts: %r, Error: %r", message.retries, exception)
message.state = constants.MessageState.WaitingToBeSent
return
if exception.action.retry == errors.ErrorAction.retry:
_logger.info("Message error, %r retries exhausted. Error: %r", message.retries, exception)
else:
_logger.info("Message error, not retrying. Error: %r", exception)
message.state = constants.MessageState.SendFailed
message._response = exception
elif result == constants.MessageSendResult.Timeout:
exception = compat.TimeoutException("Message send timed out.")
_logger.info("Message error, not retrying. Error: %r", exception)
message.state = constants.MessageState.SendFailed
message._response = exception
else:
_logger.debug("Message sent: %r, %r", result, exception)
message.state = constants.MessageState.SendComplete
message._response = errors.MessageAlreadySettled()
if message.on_send_complete:
message.on_send_complete(result, exception)
except KeyboardInterrupt:
_logger.error("Received shutdown signal while processing message send completion.")
self.message_handler._error = errors.AMQPClientShutdown()