def raw_message_receive()

in cdsresponder/rabbitmq/messageprocessor.py [0:0]


    def raw_message_receive(self, channel, method:pika.spec.Basic.Deliver, properties:pika.spec.BasicProperties, body:bytes):
        """
        called from the pika library when data is received on our channel -
        see https://pika.readthedocs.io/en/stable/modules/channel.html#pika.channel.Channel.basic_consume
        the implementation will attempt to decode the body as JSON and validate it using jsonschema against
        the schema provided by the `schema` member before passing it on to valid_message_receive
        normally you DON'T want to over-ride this, you want valid_message_receive
        :param channel: pika.channel.Channel object
        :param method: pika.spec.Basic.Deliver object - basic metadata about the deliver
        :param properties: pika.spec.BasicProperties object
        :param body: byte array of the message content
        :return:
        """
        tag = method.delivery_tag
        validated_content = None
        try:
            logger.debug("Received message with delivery tag {2} from {0}: {1}".format(channel, body.decode('UTF-8'), tag))

            if self.schema:
                validated_content = self.validate_with_schema(body)
            else:
                logger.warning("No schema nor serializer resent for validation in {0}, cannot continue".format(self.__class__.__name__))
                channel.basic_nack(delivery_tag=tag, requeue=True)

        except Exception as e:
            logger.exception("Message from {0} via {1} with delivery tag {2} did not validate: {3}"
                             .format(method.routing_key, method.exchange, method.delivery_tag, str(e)), exc_info=e)
            logger.error("Offending message content from {0} via {1} with delivery tag {2} was {3}"
                         .format(method.routing_key, method.exchange, method.delivery_tag, body.decode('UTF-8')))

            channel.basic_nack(delivery_tag=tag, requeue=False)
            return

        if validated_content is not None:
            try:
                self.valid_message_receive(channel, method.exchange, method.routing_key, method.delivery_tag, validated_content)
                channel.basic_ack(delivery_tag=tag)
            except self.NackMessage:
                logger.warning("Message was indicated to be un-processable, nacking without requeue")
                channel.basic_nack(delivery_tag=tag, requeue=False)
            except self.NackWithRetry:
                logger.warning("Message could not be processed but should be requeued")
                channel.basic_nack(delivery_tag=tag, requeue=True)
            except Exception as e:
                logger.error("Could not process message: {0}".format(str(e)))
                channel.basic_nack(delivery_tag=tag, requeue=False)
                # channel.basic_cancel(method.consumer_tag)
                # raise ValueError("Could not process message")
        else:
            logger.error("Validated content was empty but no validation error? There must be a bug")
            channel.basic_nack(delivery_tag=tag, requeue=True)
            channel.basic_cancel(method.consumer_tag)
            raise ValueError("Validated content empty but no validation error")