in rabbitmq/MessageProcessor.py [0:0]
def raw_message_receive(self, channel, method, properties, body):
"""
called from the pika library when data is received on our channel.
the implementation will attempt to decode the body as JSON and validate it using jsonschema against
the schema provided by the `schema` member before passing it on to valid_message_receive
normally you DON'T want to over-ride this, you want valid_message_receive
:param channel:
:param method:
:param properties:
:param body:
:return:
"""
tag = method.delivery_tag
validated_content = None
try:
#logger.debug("Received message with delivery tag {2} from {0}: {1}".format(channel, body.decode('UTF-8'), tag))
if self.serializer:
validated_content = self.validate_with_serializer(body)
elif self.schema:
validated_content = self.validate_with_schema(body)
else:
logger.warning("No schema nor serializer resent for validation in {0}, cannot continue".format(self.__class__.__name__))
channel.basic_nack(delivery_tag=tag, requeue=True)
except Exception as e:
logger.exception("Message did not validate: ", exc_info=e)
logger.error("Offending message content was {0}".format(body.decode('UTF-8')))
channel.basic_nack(delivery_tag=tag, requeue=False)
return
if validated_content is not None:
try:
self.valid_message_receive(method.exchange, method.routing_key, method.delivery_tag, validated_content)
channel.basic_ack(delivery_tag=tag)
except Exception as e:
logger.error("Could not process message: {0}".format(str(e)))
channel.basic_nack(delivery_tag=tag, requeue=True)
channel.basic_cancel(method.consumer_tag)
raise ValueError("Could not process message")
else:
logger.error("Validated content was empty but no validation error? There must be a bug")
channel.basic_nack(delivery_tag=tag, requeue=True)
channel.basic_cancel(method.consumer_tag)
raise ValueError("Validated content empty but no validation error")