in common/src/main/scala/com/gu/multimedia/storagetier/framework/MessageProcessingFramework.scala [307:345]
private def rejectMessage(envelope: Envelope, properties:Option[AMQP.BasicProperties], content:Json) = Try {
//handle either properties or headers being null
val maybeHeaders = for {
props <- properties
headers <- Option(props.getHeaders).map(_.asScala)
} yield headers
val originalMsgHeaders = maybeHeaders.getOrElse(Map[String, AnyRef]())
val nextRetryCount = originalMsgHeaders.getOrElse("retry-count",0) match {
case intValue:Int=>
logger.info(s"Previous retry of message ${properties.map(_.getMessageId)} is $intValue")
intValue+1
case _=>
logger.warn(s"Got unexpected value type for retry-count header on message id ${properties.map(_.getMessageId)}, resetting to 1")
1
}
val delayTime = List(math.pow(2, nextRetryCount)*1000, maximumDelayTime).min.toInt
logger.debug(s"delayTime is $delayTime")
val originalExchange = properties
.flatMap(_.getHeader[LongString]("x-original-exchange"))
.map(_.toString)
.getOrElse(envelope.getExchange)
val updatedMsgHeaders = originalMsgHeaders ++ Map(
"retry-count"->nextRetryCount.asInstanceOf[AnyRef],
"x-original-exchange"->originalExchange.asInstanceOf[AnyRef],
"x-original-routing-key"->envelope.getRoutingKey.asInstanceOf[AnyRef]
)
val newProps = new BasicProperties.Builder()
.contentType("application/json")
.expiration(delayTime.toString)
.headers(updatedMsgHeaders.asJava)
.messageId(properties.map(_.getMessageId).getOrElse(UUID.randomUUID().toString))
.build()
channel.basicPublish(retryExchangeName, envelope.getRoutingKey, false, newProps, content.noSpaces.getBytes(cs))
channel.basicAck(envelope.getDeliveryTag, false)
}