cdsresponder/rabbitmq/messageprocessor.py (54 lines of code) (raw):

import jsonschema import json import logging import pika.spec logger = logging.getLogger(__name__) class MessageProcessor(object): """ MessageProcessor describes the interface that all message processor classes should implement """ schema = None # override this in a subclass routing_key = None # override this in a subclass class NackMessage(Exception): pass class NackWithRetry(Exception): pass def valid_message_receive(self, channel:pika.spec.Channel, exchange_name, routing_key, delivery_tag, body): """ override this method in a subclass in order to receive information :param channel: the open pika channel, for sending messages back to our exchange :param exchange_name: :param routing_key: :param delivery_tag: :param body: :return: """ logger.debug("Received validated message from {0} via {1} with {2}: {3}".format(exchange_name, routing_key, delivery_tag, body)) pass def validate_with_schema(self, body): content = json.loads(body.decode('UTF-8')) jsonschema.validate(content, self.schema) # throws an exception if the content does not validate return content #if we get to this line, then validation was successful def raw_message_receive(self, channel, method:pika.spec.Basic.Deliver, properties:pika.spec.BasicProperties, body:bytes): """ called from the pika library when data is received on our channel - see https://pika.readthedocs.io/en/stable/modules/channel.html#pika.channel.Channel.basic_consume the implementation will attempt to decode the body as JSON and validate it using jsonschema against the schema provided by the `schema` member before passing it on to valid_message_receive normally you DON'T want to over-ride this, you want valid_message_receive :param channel: pika.channel.Channel object :param method: pika.spec.Basic.Deliver object - basic metadata about the deliver :param properties: pika.spec.BasicProperties object :param body: byte array of the message content :return: """ tag = method.delivery_tag validated_content = None try: logger.debug("Received message with delivery tag {2} from {0}: {1}".format(channel, body.decode('UTF-8'), tag)) if self.schema: validated_content = self.validate_with_schema(body) else: logger.warning("No schema nor serializer resent for validation in {0}, cannot continue".format(self.__class__.__name__)) channel.basic_nack(delivery_tag=tag, requeue=True) except Exception as e: logger.exception("Message from {0} via {1} with delivery tag {2} did not validate: {3}" .format(method.routing_key, method.exchange, method.delivery_tag, str(e)), exc_info=e) logger.error("Offending message content from {0} via {1} with delivery tag {2} was {3}" .format(method.routing_key, method.exchange, method.delivery_tag, body.decode('UTF-8'))) channel.basic_nack(delivery_tag=tag, requeue=False) return if validated_content is not None: try: self.valid_message_receive(channel, method.exchange, method.routing_key, method.delivery_tag, validated_content) channel.basic_ack(delivery_tag=tag) except self.NackMessage: logger.warning("Message was indicated to be un-processable, nacking without requeue") channel.basic_nack(delivery_tag=tag, requeue=False) except self.NackWithRetry: logger.warning("Message could not be processed but should be requeued") channel.basic_nack(delivery_tag=tag, requeue=True) except Exception as e: logger.error("Could not process message: {0}".format(str(e))) channel.basic_nack(delivery_tag=tag, requeue=False) # channel.basic_cancel(method.consumer_tag) # raise ValueError("Could not process message") else: logger.error("Validated content was empty but no validation error? There must be a bug") channel.basic_nack(delivery_tag=tag, requeue=True) channel.basic_cancel(method.consumer_tag) raise ValueError("Validated content empty but no validation error")