in rabbitmq/ProjectMessageProcessor.py [0:0]
def raw_message_receive(self, channel, method, properties, body):
tag = method.delivery_tag
validated_content = None
try:
if self.serializer:
validated_content = self.validate_with_serializer(body)
elif self.schema:
validated_content = self.validate_with_schema(body)
else:
logger.warning("No schema nor serializer resent for validation in {0}, cannot continue".format(self.__class__.__name__))
channel.basic_nack(delivery_tag=tag, requeue=True)
except Exception as e:
logger.exception("Message did not validate: ", exc_info=e)
logger.error("Offending message content was {0}".format(body.decode('UTF-8')))
channel.basic_nack(delivery_tag=tag, requeue=False)
return
if validated_content is not None:
try:
self.valid_message_receive(method.exchange, method.routing_key, method.delivery_tag, validated_content)
channel.basic_ack(delivery_tag=tag)
except Exception as e:
logger.error("Could not process message: {0}".format(str(e)))
channel.basic_nack(delivery_tag=tag, requeue=False)
body_data = json.loads(body.decode('UTF-8'))[0]
should_retry = True
if "retry_count" in body_data:
if body_data["retry_count"] > 32:
should_retry = False
if should_retry:
if "retry_count" in body_data:
body_data["retry_count"] = body_data["retry_count"] + 1
else:
body_data["retry_count"] = 1
body_as_json = json.dumps([body_data]).encode('UTF-8')
logger.info("Publishing the message again with a retry count of {0} on exchange {1} with key {2}".format(body_data["retry_count"], method.exchange, method.routing_key))
try:
channel.basic_publish(exchange=method.exchange, routing_key=method.routing_key, body=body_as_json, properties=properties)
except Exception as e:
logger.error("Could not publish message: {0}".format(str(e)))
else:
logger.info("Publishing the message again with a retry count of {0} on exchange atomresponder-dlx with key atomresponder-dlq".format(body_data["retry_count"]))
channel.basic_publish(exchange="atomresponder-dlx", routing_key="atomresponder-dlq", body=body, properties=properties)
else:
logger.error("Validated content was empty but no validation error? There must be a bug")
channel.basic_nack(delivery_tag=tag, requeue=True)
channel.basic_cancel(method.consumer_tag)
raise ValueError("Validated content empty but no validation error")