def _message_received()

in uamqp/receiver.py [0:0]


    def _message_received(self, message):
        """Callback run on receipt of every message. If there is
        a user-defined callback, this will be called.
        Additionally if the client is retrieving messages for a batch
        or iterator, the message will be added to an internal queue.

        :param message: c_uamqp.Message
        """
        # pylint: disable=protected-access
        message_number = self._receiver.last_received_message_number()
        if self._settle_mode == constants.ReceiverSettleMode.ReceiveAndDelete:
            settler = None
        else:
            settler = functools.partial(self._settle_message, message_number)
        try:
            wrapped_message = uamqp.Message(
                message=message,
                encoding=self.encoding,
                settler=settler,
                delivery_no=message_number)
            self.on_message_received(wrapped_message)
        except RuntimeError:
            condition = b"amqp:unknown-error"
            self._error = errors._process_link_error(self.error_policy, condition, None, None)
            _logger.info("Unable to settle message no %r. Disconnecting.\nLink: %r\nConnection: %r",
                         message_number,
                         self.name,
                         self._session._connection.container_id)
        except KeyboardInterrupt:
            _logger.error("Received shutdown signal while processing message no %r\nRejecting message.", message_number)
            self._receiver.settle_modified_message(message_number, True, True, None)
            self._error = errors.AMQPClientShutdown()
        except Exception as e:  # pylint: disable=broad-except
            _logger.error("Error processing message no %r: %r\nRejecting message.", message_number, e)
            self._receiver.settle_modified_message(message_number, True, True, None)