in common/src/main/scala/com/gu/multimedia/storagetier/framework/MessageProcessingFramework.scala [347:414]
private def simpleQueueDeclare(queueName:String) = channel.queueDeclare(queueName, true, false, false,Map[String, AnyRef]().asJava )
/**
* Kick off the framework. This returns a future which should only resolve when the framework terminates.
* @return
*/
def run() = {
try {
//output exchange where we send our completion notifications
channel.exchangeDeclare(output_exchange_name, "topic",
true,
false,
Map("x-dead-letter-exchange" -> failedExchangeName.asInstanceOf[AnyRef]).asJava
)
//dead-letter queue for permanent failures
channel.exchangeDeclare(failedExchangeName,
"topic",
true)
simpleQueueDeclare(failedQueueName)
channel.queueBind(failedQueueName, failedExchangeName, "#")
//messages posted to the retryExchange are routed to a queue where they are delayed by the TTL provided on
//the message and are then sent back to the retryInputExchange
channel.exchangeDeclare(retryExchangeName, "topic", false)
//messages come onto the RetryInputExchange and we pick them up and re-process them from there
channel.exchangeDeclare(retryInputExchangeName, "topic", false)
//link the retryExchange back to the retryInputExchange with a queue named after the retryInputExchange.
// The message is received from retryExchange then expires
// after its provided TTL which then triggers it to be "dead-lettered" back onto the retryInputExchange
channel.queueDeclare(retryInputExchangeName,
true,
false,
false,
Map[String,AnyRef](
"x-dead-letter-exchange"->retryInputExchangeName.asInstanceOf[AnyRef],
"x-message-ttl"->maximumDelayTime.asInstanceOf[AnyRef]
).asJava
)
channel.queueBind(retryInputExchangeName, retryExchangeName, "#")
//we declare a single queue that receives all the messages we are interested in, and bind it to the retry input exchange
channel.queueDeclare(ingest_queue_name,
true,
false,
false,
Map[String,AnyRef](
"x-dead-letter-exchange"->failedExchangeName.asInstanceOf[AnyRef],
).asJava
)
channel.queueBind(ingest_queue_name, retryInputExchangeName, "#")
//now we also bind it to all of the exchanges that are listed in our configuration
handlers.foreach(conf => {
conf.routingKey.foreach(routingKey=> {
channel.queueBind(ingest_queue_name, conf.exchangeName, routingKey)
})
})
channel.basicQos(1, true)
channel.basicConsume(ingest_queue_name, false, MsgConsumer)
} catch {
case err:Throwable=>completionPromise.failure(err)
}
completionPromise.future
}