in cdsresponder/cdsresponder.py [0:0]
def connect_channel(exchange_name, handler, channel):
"""
async callback that is used to connect a channel once it has been declared
:param channel: channel to set up
:param exchange_name: str name of the exchange to connect to
:param handler: a MessageProcessor class (NOT instance)
:return:
"""
logger.info("Establishing connection to exchange {0} from {1}...".format(exchange_name, handler.__class__.__name__))
sanitised_routingkey = re.sub(r'[^\w\d]', '', handler.routing_key)
Command.declare_rabbitmq_setup(channel)
queuename = "cdsresponder-{0}".format(sanitised_routingkey)
channel.queue_declare("cdsresponder-dlq", durable=True)
channel.queue_bind("cdsresponder-dlq","cdsresponder-dlx")
channel.queue_declare(queuename, arguments={
'x-dead-letter-exchange': "cdsresponder-dlx"
})
channel.queue_bind(queuename, exchange_name, routing_key=handler.routing_key)
channel.basic_consume(queuename,
handler.raw_message_receive,
auto_ack=False,
exclusive=False,
callback=lambda consumer: logger.info("Consumer started for {0} from {1}".format(queuename, exchange_name)),
)