gnm_deliverables/signals.py (92 lines of code) (raw):

from django.db.models.signals import post_save, post_delete from .models import Deliverable, DeliverableAsset from django.dispatch import receiver import logging from rest_framework.renderers import JSONRenderer import pika import pika.exceptions from django.conf import settings from time import sleep from rabbitmq.declaration import declare_rabbitmq_setup import os logger = logging.getLogger(__name__) class MessageRelay(object): """ MessageRelay encapsulates the logic that sends messages to rabbitmq. This is done to lazily initialize the connection. """ @staticmethod def setup_connection(): connection = pika.BlockingConnection( pika.ConnectionParameters( host=getattr(settings,"RABBITMQ_HOST","localhost"), port=getattr(settings,"RABBITMQ_PORT", 5672), virtual_host=getattr(settings, "RABBITMQ_VHOST", "prexit"), credentials=pika.credentials.PlainCredentials( getattr(settings,"RABBITMQ_USER","pluto-ng"), getattr(settings,"RABBITMQ_PASSWD","") ) ) ) channel = connection.channel() declare_rabbitmq_setup(channel) return channel def send_content(self, routing_key:str, payload:bytes, connect_attempt=0): max_attempts = getattr(settings,"RABBITMQ_MAX_SEND_ATTEMPTS",10) try: channel = MessageRelay.setup_connection() channel.basic_publish( exchange='pluto-deliverables', routing_key=routing_key, body=payload ) except (pika.exceptions.ChannelWrongStateError, pika.exceptions.AMQPConnectionError, pika.exceptions.AMQPChannelError) as e: if "CI" in os.environ: logger.warning("Ignoring connection error as we are in CI mode") return if connect_attempt>max_attempts: logger.error("Could not re-establish broker connection after {0} attempts, giving up") raise else: sleep(2*connect_attempt) logger.warning("Message queue connection was lost: {0}. Attempting to reconnect, attempt {1}".format(str(e), connect_attempt)) self.send_content(routing_key, payload, connect_attempt+1) def relay_message(self, affected_model, action): from .serializers import DeliverableSerializer, DeliverableAssetSerializer try: if isinstance(affected_model, Deliverable): logger.info("{0} an instance of Deliverable with id {1}".format(action, affected_model.project_id)) content = DeliverableSerializer(affected_model) elif isinstance(affected_model, DeliverableAsset): logger.info("{0} an instance of DeliverableAsset with id {1} at {2}".format(action, affected_model.pk, affected_model.absolute_path)) content = DeliverableAssetSerializer(affected_model) elif affected_model.__class__.__name__=="Migration": #silently ignore this one content = None elif affected_model.__class__.__name__=="User": #silently ignore this one content = None elif affected_model.__class__.__name__ == "DailyMotion": content = None elif affected_model.__class__.__name__ == "Mainstream": content = None elif affected_model.__class__.__name__ == "YouTubeCategories": content = None elif affected_model.__class__.__name__ == "YouTubeChannels": content = None else: content = None logger.error("model_saved got an unexpected model class: {0}.{1}".format(affected_model.__class__.__module__, affected_model.__class__.__name__)) if content: routing_key = "deliverables.{0}.{1}".format(affected_model.__class__.__name__.lower(), action) payload = JSONRenderer().render(content.data) self.send_content(routing_key, payload) except Exception as e: logger.error("Could not relay message of {0} on {1}: {2}".format(action, affected_model, str(e))) logger.exception(e) #we don't want to bring down the app here, log it out and hope somebody sees msgrelay = MessageRelay() @receiver(post_save) def model_saved(sender, **kwargs): did_create = kwargs.get("created") if did_create: action = "create" else: action = "update" return msgrelay.relay_message(kwargs.get("instance"), action) @receiver(post_delete) def model_deleted(sender, **kwargs): return msgrelay.relay_message(kwargs.get("instance"), "delete")