rabbitmq/ProjectMessageProcessor.py (102 lines of code) (raw):

import logging from .MessageProcessor import MessageProcessor from .media_atom import update_kinesis, MSG_PROJECT_CREATED, MSG_PROJECT_UPDATED from .serializers import ProjectModelSerializer from .models import ProjectModel, CachedCommission, LinkedProject import pika from django.conf import settings import json logger = logging.getLogger(__name__) def send_missing_commission_message(commission_id): connection = pika.BlockingConnection(pika.ConnectionParameters( host=settings.RABBITMQ_HOST, port=getattr(settings, "RABBITMQ_PORT", 5672), virtual_host=getattr(settings, "RABBITMQ_VHOST", "/"), credentials=pika.PlainCredentials(username=settings.RABBITMQ_USER, password=settings.RABBITMQ_PASSWORD) )) channel = connection.channel() channel.queue_declare("missing-commissions") channel.queue_bind(exchange="pluto-atomresponder", queue="missing-commissions", routing_key="atomresponder.commission.missing-id") logger.info("About to send commission missing id. message for commission: {0}.".format(commission_id)) channel.basic_publish(exchange="pluto-atomresponder", routing_key="atomresponder.commission.missing-id", body='{"id":"%s"}' % commission_id) class ProjectMessageProcessor(MessageProcessor): serializer = ProjectModelSerializer routing_key = "core.project.*" def valid_message_receive(self, exchange_name, routing_key, delivery_tag, body): message = None if "create" in routing_key: logger.debug("Received project created message") message = MSG_PROJECT_CREATED elif "update" in routing_key: logger.debug("Received project update message") message = MSG_PROJECT_UPDATED else: logger.error("Received unknown message of type {0}".format(routing_key)) raise ValueError("Unknown routing key") project_model = ProjectModel(**body) try: cached_commission = CachedCommission.objects.get(id=project_model.commissionId) try: linked_project = LinkedProject.objects.get(project_id=project_model.id) linked_project.project_id = project_model.id linked_project.commission = cached_commission linked_project.save() except LinkedProject.DoesNotExist: linked_project = LinkedProject(project_id=project_model.id, commission=cached_commission) linked_project.save() logger.info("ProjectMessageProcessor got {0}".format(project_model)) update_kinesis(project_model, cached_commission, message) except CachedCommission.DoesNotExist: send_missing_commission_message(project_model.commissionId) logger.warning("No cached commission data found for id {0}".format(project_model.commissionId)) raise def raw_message_receive(self, channel, method, properties, body): tag = method.delivery_tag validated_content = None try: if self.serializer: validated_content = self.validate_with_serializer(body) elif self.schema: validated_content = self.validate_with_schema(body) else: logger.warning("No schema nor serializer resent for validation in {0}, cannot continue".format(self.__class__.__name__)) channel.basic_nack(delivery_tag=tag, requeue=True) except Exception as e: logger.exception("Message did not validate: ", exc_info=e) logger.error("Offending message content was {0}".format(body.decode('UTF-8'))) channel.basic_nack(delivery_tag=tag, requeue=False) return if validated_content is not None: try: self.valid_message_receive(method.exchange, method.routing_key, method.delivery_tag, validated_content) channel.basic_ack(delivery_tag=tag) except Exception as e: logger.error("Could not process message: {0}".format(str(e))) channel.basic_nack(delivery_tag=tag, requeue=False) body_data = json.loads(body.decode('UTF-8'))[0] should_retry = True if "retry_count" in body_data: if body_data["retry_count"] > 32: should_retry = False if should_retry: if "retry_count" in body_data: body_data["retry_count"] = body_data["retry_count"] + 1 else: body_data["retry_count"] = 1 body_as_json = json.dumps([body_data]).encode('UTF-8') logger.info("Publishing the message again with a retry count of {0} on exchange {1} with key {2}".format(body_data["retry_count"], method.exchange, method.routing_key)) try: channel.basic_publish(exchange=method.exchange, routing_key=method.routing_key, body=body_as_json, properties=properties) except Exception as e: logger.error("Could not publish message: {0}".format(str(e))) else: logger.info("Publishing the message again with a retry count of {0} on exchange atomresponder-dlx with key atomresponder-dlq".format(body_data["retry_count"])) channel.basic_publish(exchange="atomresponder-dlx", routing_key="atomresponder-dlq", body=body, properties=properties) else: logger.error("Validated content was empty but no validation error? There must be a bug") channel.basic_nack(delivery_tag=tag, requeue=True) channel.basic_cancel(method.consumer_tag) raise ValueError("Validated content empty but no validation error")