rabbitmq/MessageProcessor.py (61 lines of code) (raw):
import jsonschema
import json
import logging
from rest_framework.parsers import JSONParser
import io
logger = logging.getLogger(__name__)
class MessageProcessor(object):
"""
MessageProcessor describes the interface that all message processor classes should implement
"""
schema = None # override this in a subclass
routing_key = None # override this in a subclass
serializer = None # override this in a subclass
def valid_message_receive(self, exchange_name, routing_key, delivery_tag, body):
"""
override this method in a subclass in order to receive information
:param exchange_name:
:param routing_key:
:param delivery_tag:
:param body:
:return:
"""
logger.debug("Received validated message from {0} via {1} with {2}: {3}".format(exchange_name, routing_key, delivery_tag, body))
pass
def validate_with_schema(self, body):
content = json.loads(body.decode('UTF-8'))
jsonschema.validate(content, self.schema) # throws an exception if the content does not validate
return content #if we get to this line, then validation was successful
def validate_with_serializer(self, body):
content = JSONParser().parse(stream=io.BytesIO(body))
if isinstance(content, list):
content_list = content
else:
content_list = [content]
if len(content_list)>1:
logger.warning("handling messages with >1 member is not implemented yet")
entry = content_list[0]
serializer_inst = self.serializer(data=entry)
if serializer_inst.is_valid():
return serializer_inst.validated_data
else:
raise ValueError(str(serializer_inst.errors))
def raw_message_receive(self, channel, method, properties, body):
"""
called from the pika library when data is received on our channel.
the implementation will attempt to decode the body as JSON and validate it using jsonschema against
the schema provided by the `schema` member before passing it on to valid_message_receive
normally you DON'T want to over-ride this, you want valid_message_receive
:param channel:
:param method:
:param properties:
:param body:
:return:
"""
tag = method.delivery_tag
validated_content = None
try:
#logger.debug("Received message with delivery tag {2} from {0}: {1}".format(channel, body.decode('UTF-8'), tag))
if self.serializer:
validated_content = self.validate_with_serializer(body)
elif self.schema:
validated_content = self.validate_with_schema(body)
else:
logger.warning("No schema nor serializer resent for validation in {0}, cannot continue".format(self.__class__.__name__))
channel.basic_nack(delivery_tag=tag, requeue=True)
except Exception as e:
logger.exception("Message did not validate: ", exc_info=e)
logger.error("Offending message content was {0}".format(body.decode('UTF-8')))
channel.basic_nack(delivery_tag=tag, requeue=False)
return
if validated_content is not None:
try:
self.valid_message_receive(method.exchange, method.routing_key, method.delivery_tag, validated_content)
channel.basic_ack(delivery_tag=tag)
except Exception as e:
logger.error("Could not process message: {0}".format(str(e)))
channel.basic_nack(delivery_tag=tag, requeue=True)
channel.basic_cancel(method.consumer_tag)
raise ValueError("Could not process message")
else:
logger.error("Validated content was empty but no validation error? There must be a bug")
channel.basic_nack(delivery_tag=tag, requeue=True)
channel.basic_cancel(method.consumer_tag)
raise ValueError("Validated content empty but no validation error")