private def simpleQueueDeclare()

in common/src/main/scala/com/gu/multimedia/storagetier/framework/MessageProcessingFramework.scala [347:414]


  private def simpleQueueDeclare(queueName:String) = channel.queueDeclare(queueName, true, false, false,Map[String, AnyRef]().asJava )
  /**
   * Kick off the framework.  This returns a future which should only resolve when the framework terminates.
   * @return
   */
  def run() = {
    try {
      //output exchange where we send our completion notifications
      channel.exchangeDeclare(output_exchange_name, "topic",
        true,
        false,
        Map("x-dead-letter-exchange" -> failedExchangeName.asInstanceOf[AnyRef]).asJava
      )

      //dead-letter queue for permanent failures
      channel.exchangeDeclare(failedExchangeName,
        "topic",
        true)
      simpleQueueDeclare(failedQueueName)
      channel.queueBind(failedQueueName, failedExchangeName, "#")

      //messages posted to the retryExchange are routed to a queue where they are delayed by the TTL provided on
      //the message and are then sent back to the retryInputExchange
      channel.exchangeDeclare(retryExchangeName, "topic", false)

      //messages come onto the RetryInputExchange and we pick them up and re-process them from there
      channel.exchangeDeclare(retryInputExchangeName, "topic", false)

      //link the retryExchange back to the retryInputExchange with a queue named after the retryInputExchange.
      // The message is received from retryExchange then expires
      // after its provided TTL which then triggers it to be "dead-lettered" back onto the retryInputExchange
      channel.queueDeclare(retryInputExchangeName,
        true,
        false,
        false,
        Map[String,AnyRef](
          "x-dead-letter-exchange"->retryInputExchangeName.asInstanceOf[AnyRef],
          "x-message-ttl"->maximumDelayTime.asInstanceOf[AnyRef]
        ).asJava
      )
      channel.queueBind(retryInputExchangeName, retryExchangeName, "#")

      //we declare a single queue that receives all the messages we are interested in, and bind it to the retry input exchange
      channel.queueDeclare(ingest_queue_name,
        true,
        false,
        false,
        Map[String,AnyRef](
          "x-dead-letter-exchange"->failedExchangeName.asInstanceOf[AnyRef],
        ).asJava
      )

      channel.queueBind(ingest_queue_name, retryInputExchangeName, "#")

      //now we also bind it to all of the exchanges that are listed in our configuration
      handlers.foreach(conf => {
        conf.routingKey.foreach(routingKey=> {
          channel.queueBind(ingest_queue_name, conf.exchangeName, routingKey)
        })
      })

      channel.basicQos(1, true)
      channel.basicConsume(ingest_queue_name, false, MsgConsumer)
    } catch {
      case err:Throwable=>completionPromise.failure(err)
    }
    completionPromise.future
  }