rabbitmq/management/commands/run_rabbitmq_responder.py (78 lines of code) (raw):
from django.core.management.base import BaseCommand
import logging
import pika
from django.conf import settings
import re
from functools import partial
import sys
import signal
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = "Runs the responder program for in-cluster messages"
def __init__(self, *args, **kwargs):
super(Command, self).__init__(*args,**kwargs)
self.exit_code = 0
self.runloop = None
@staticmethod
def connect_channel(exchange_name, handler, channel, durable=False):
"""
async callback that is used to connect a channel once it has been declared
:param channel: channel to set up
:param exchange_name: str name of the exchange to connect to
:param handler: a MessageProcessor class (NOT instance)
:return:
"""
logger.info("Establishing connection to exchange {0} from {1}...".format(exchange_name, handler.__class__.__name__))
sanitised_routingkey = re.sub(r'[^\w\d]', '', handler.routing_key)
queuename = "atomresponder-{0}".format(sanitised_routingkey)
channel.exchange_declare(exchange="atomresponder-dlx", exchange_type="direct")
channel.queue_declare("atomresponder-dlq", durable=True)
channel.queue_bind("atomresponder-dlq","atomresponder-dlx")
channel.exchange_declare(exchange=exchange_name, exchange_type="topic", durable=durable)
channel.queue_declare(queuename, arguments={
'x-message-ttl': getattr(settings,"RABBITMQ_QUEUE_TTL", 5000),
'x-dead-letter-exchange': "atomresponder-dlx"
})
channel.queue_bind(queuename, exchange_name, routing_key=handler.routing_key)
channel.basic_consume(queuename,
handler.raw_message_receive,
auto_ack=False,
exclusive=False,
callback=lambda consumer: logger.info("Consumer started for {0} from {1}".format(queuename, exchange_name)),
)
def channel_opened(self, connection):
"""
async callback that is invoked when the connection is ready.
it is used to connect up the channels
:param connection: rabbitmq connection
:return:
"""
from rabbitmq.mappings import EXCHANGE_MAPPINGS
logger.info("Connection opened")
for i in range(0, len(EXCHANGE_MAPPINGS)):
# partial adjusts the argument list, adding the args here onto the _start_ of the list
# so the args are (exchange, handler, channel) not (channel, exchange, handler)
durable_flag = EXCHANGE_MAPPINGS[i]["durable"] if "durable" in EXCHANGE_MAPPINGS[i] else False
chl = connection.channel(on_open_callback=partial(Command.connect_channel,
EXCHANGE_MAPPINGS[i]["exchange"],
EXCHANGE_MAPPINGS[i]["handler"],
durable=durable_flag),
)
chl.add_on_close_callback(self.channel_closed)
chl.add_on_cancel_callback(self.channel_closed)
def channel_closed(self, *args):
logger.error("Critical connection closed, terminating")
self.exit_code = 1
self.runloop.stop()
def connection_closed(self, connection, error=None):
"""
async callback that is invoked when the connection fails.
print an error and shut down, this will then get detected as a crash-loop state
:param connection:
:param error:
:return:
"""
logger.error("RabbitMQ connection failed: {0}".format(str(error)))
self.exit_code = 1
connection.ioloop.stop()
def handle(self, *args, **options):
connection = pika.SelectConnection(
pika.ConnectionParameters(
host=settings.RABBITMQ_HOST,
port=getattr(settings, "RABBITMQ_PORT", 5672),
virtual_host=getattr(settings, "RABBITMQ_VHOST", "/"),
credentials=pika.PlainCredentials(username=settings.RABBITMQ_USER, password=settings.RABBITMQ_PASSWORD),
connection_attempts=getattr(settings, "RABBITMQ_CONNECTION_ATTEMPTS", 3),
retry_delay=getattr(settings, "RABBITMQ_RETRY_DELAY", 3)
),
on_open_callback=self.channel_opened,
on_close_callback=self.connection_closed,
on_open_error_callback=self.connection_closed,
)
self.runloop = connection.ioloop
def on_quit(signum, frame):
logger.info("Caught signal {0}, exiting...".format(signum))
connection.ioloop.stop()
signal.signal(signal.SIGINT, on_quit)
signal.signal(signal.SIGTERM, on_quit)
connection.ioloop.start()
logger.info("terminated")
sys.exit(self.exit_code)