def raw_message_receive()

in rabbitmq/ProjectMessageProcessor.py [0:0]


    def raw_message_receive(self, channel, method, properties, body):
        tag = method.delivery_tag
        validated_content = None
        try:
            if self.serializer:
                validated_content = self.validate_with_serializer(body)
            elif self.schema:
                validated_content = self.validate_with_schema(body)
            else:
                logger.warning("No schema nor serializer resent for validation in {0}, cannot continue".format(self.__class__.__name__))
                channel.basic_nack(delivery_tag=tag, requeue=True)

        except Exception as e:
            logger.exception("Message did not validate: ", exc_info=e)
            logger.error("Offending message content was {0}".format(body.decode('UTF-8')))
            channel.basic_nack(delivery_tag=tag, requeue=False)
            return

        if validated_content is not None:
            try:
                self.valid_message_receive(method.exchange, method.routing_key, method.delivery_tag, validated_content)
                channel.basic_ack(delivery_tag=tag)
            except Exception as e:
                logger.error("Could not process message: {0}".format(str(e)))
                channel.basic_nack(delivery_tag=tag, requeue=False)
                body_data = json.loads(body.decode('UTF-8'))[0]
                should_retry = True
                if "retry_count" in body_data:
                    if body_data["retry_count"] > 32:
                        should_retry = False
                if should_retry:
                    if "retry_count" in body_data:
                        body_data["retry_count"] = body_data["retry_count"] + 1
                    else:
                        body_data["retry_count"] = 1
                    body_as_json = json.dumps([body_data]).encode('UTF-8')
                    logger.info("Publishing the message again with a retry count of {0} on exchange {1} with key {2}".format(body_data["retry_count"], method.exchange, method.routing_key))
                    try:
                        channel.basic_publish(exchange=method.exchange, routing_key=method.routing_key, body=body_as_json, properties=properties)
                    except Exception as e:
                        logger.error("Could not publish message: {0}".format(str(e)))
                else:
                    logger.info("Publishing the message again with a retry count of {0} on exchange atomresponder-dlx with key atomresponder-dlq".format(body_data["retry_count"]))
                    channel.basic_publish(exchange="atomresponder-dlx", routing_key="atomresponder-dlq", body=body, properties=properties)
        else:
            logger.error("Validated content was empty but no validation error? There must be a bug")
            channel.basic_nack(delivery_tag=tag, requeue=True)
            channel.basic_cancel(method.consumer_tag)
            raise ValueError("Validated content empty but no validation error")