in cdsresponder/rabbitmq/messageprocessor.py [0:0]
def raw_message_receive(self, channel, method:pika.spec.Basic.Deliver, properties:pika.spec.BasicProperties, body:bytes):
"""
called from the pika library when data is received on our channel -
see https://pika.readthedocs.io/en/stable/modules/channel.html#pika.channel.Channel.basic_consume
the implementation will attempt to decode the body as JSON and validate it using jsonschema against
the schema provided by the `schema` member before passing it on to valid_message_receive
normally you DON'T want to over-ride this, you want valid_message_receive
:param channel: pika.channel.Channel object
:param method: pika.spec.Basic.Deliver object - basic metadata about the deliver
:param properties: pika.spec.BasicProperties object
:param body: byte array of the message content
:return:
"""
tag = method.delivery_tag
validated_content = None
try:
logger.debug("Received message with delivery tag {2} from {0}: {1}".format(channel, body.decode('UTF-8'), tag))
if self.schema:
validated_content = self.validate_with_schema(body)
else:
logger.warning("No schema nor serializer resent for validation in {0}, cannot continue".format(self.__class__.__name__))
channel.basic_nack(delivery_tag=tag, requeue=True)
except Exception as e:
logger.exception("Message from {0} via {1} with delivery tag {2} did not validate: {3}"
.format(method.routing_key, method.exchange, method.delivery_tag, str(e)), exc_info=e)
logger.error("Offending message content from {0} via {1} with delivery tag {2} was {3}"
.format(method.routing_key, method.exchange, method.delivery_tag, body.decode('UTF-8')))
channel.basic_nack(delivery_tag=tag, requeue=False)
return
if validated_content is not None:
try:
self.valid_message_receive(channel, method.exchange, method.routing_key, method.delivery_tag, validated_content)
channel.basic_ack(delivery_tag=tag)
except self.NackMessage:
logger.warning("Message was indicated to be un-processable, nacking without requeue")
channel.basic_nack(delivery_tag=tag, requeue=False)
except self.NackWithRetry:
logger.warning("Message could not be processed but should be requeued")
channel.basic_nack(delivery_tag=tag, requeue=True)
except Exception as e:
logger.error("Could not process message: {0}".format(str(e)))
channel.basic_nack(delivery_tag=tag, requeue=False)
# channel.basic_cancel(method.consumer_tag)
# raise ValueError("Could not process message")
else:
logger.error("Validated content was empty but no validation error? There must be a bug")
channel.basic_nack(delivery_tag=tag, requeue=True)
channel.basic_cancel(method.consumer_tag)
raise ValueError("Validated content empty but no validation error")