private def rejectMessage()

in common/src/main/scala/com/gu/multimedia/storagetier/framework/MessageProcessingFramework.scala [307:345]


  private def rejectMessage(envelope: Envelope, properties:Option[AMQP.BasicProperties], content:Json) = Try {
    //handle either properties or headers being null
    val maybeHeaders = for {
      props <- properties
      headers <- Option(props.getHeaders).map(_.asScala)
    } yield headers
    val originalMsgHeaders = maybeHeaders.getOrElse(Map[String, AnyRef]())

    val nextRetryCount = originalMsgHeaders.getOrElse("retry-count",0) match {
      case intValue:Int=>
        logger.info(s"Previous retry of message ${properties.map(_.getMessageId)} is $intValue")
        intValue+1
      case _=>
        logger.warn(s"Got unexpected value type for retry-count header on message id ${properties.map(_.getMessageId)}, resetting to 1")
        1
    }
    val delayTime = List(math.pow(2, nextRetryCount)*1000, maximumDelayTime).min.toInt
    logger.debug(s"delayTime is $delayTime")
    val originalExchange = properties
      .flatMap(_.getHeader[LongString]("x-original-exchange"))
      .map(_.toString)
      .getOrElse(envelope.getExchange)

    val updatedMsgHeaders = originalMsgHeaders ++ Map(
      "retry-count"->nextRetryCount.asInstanceOf[AnyRef],
      "x-original-exchange"->originalExchange.asInstanceOf[AnyRef],
      "x-original-routing-key"->envelope.getRoutingKey.asInstanceOf[AnyRef]
    )

    val newProps = new BasicProperties.Builder()
      .contentType("application/json")
      .expiration(delayTime.toString)
      .headers(updatedMsgHeaders.asJava)
      .messageId(properties.map(_.getMessageId).getOrElse(UUID.randomUUID().toString))
      .build()

    channel.basicPublish(retryExchangeName, envelope.getRoutingKey, false, newProps, content.noSpaces.getBytes(cs))
    channel.basicAck(envelope.getDeliveryTag, false)
  }