in common/src/main/scala/com/gu/multimedia/storagetier/framework/MessageProcessingFramework.scala [91:186]
override def handleDelivery(consumerTag: String, envelope: Envelope, properties: AMQP.BasicProperties, body: Array[Byte]): Unit = {
val retryAttempt = Try {
properties.getHeaders.asScala.getOrElse("retry-count",0).asInstanceOf[Int]
}.toOption.getOrElse(0)
MDC.put("msgId", Option(properties.getMessageId).getOrElse(""))
MDC.put("retryAttempt", retryAttempt.toString)
MDC.put("routingKey", Option(envelope.getRoutingKey).getOrElse(""))
MDC.put("exchange", Option(envelope.getExchange).getOrElse(""))
MDC.put("isRedeliver", Option(envelope.isRedeliver).getOrElse(false).toString)
val matchingConfigurations =
if(envelope.getExchange==retryInputExchangeName) { //if the message came from the retry exchange, then look up the original exchange and use that
logger.debug(s"Message ${properties.getMessageId} is a retry, on attempt ${retryAttempt}")
logger.debug(s"Message headers: ${properties.getHeaders}")
if(retryAttempt>=maximumRetryLimit) {
permanentlyRejectMessage(envelope, properties, body, "Too many retries")
return
}
properties.getHeader[LongString]("x-original-exchange") match {
case Some(effectiveExchange)=>
logger.debug(s"Original exchange is $effectiveExchange, routing to that processor")
handlers.filter(_.exchangeName==effectiveExchange.toString)
case None=>
logger.error(s"Could not determine the original exchange for retried message ${properties.getMessageId}")
Seq()
}
} else { //otherwise just use the exchange given by the envelope
handlers.filter(_.exchangeName==envelope.getExchange)
}
logger.debug(s"${matchingConfigurations.length} processors matched, will use the first")
val completionFuture = convertToUTFString(body).flatMap(wrappedParse) match {
case Left(err)=>
Future.fromTry(permanentlyRejectMessage(envelope, properties, body, err))
case Right(msg)=>
if(matchingConfigurations.isEmpty) {
logger.error(s"No processors are configured to handle messages from ${envelope.getExchange}")
Future.fromTry(rejectMessage(envelope, Some(properties), msg))
} else {
val targetConfig = matchingConfigurations.head
targetConfig.processor.handleMessage(envelope.getRoutingKey, msg, MessageProcessingFramework.this).map({
case Left(errDesc)=>
logger.warn(s"MsgID ${properties.getMessageId} Retryable failure: \"$errDesc\"")
rejectMessage(envelope, Option(properties), msg)
case Right(returnValue)=>
RoutingKeyMatcher.findMatchingIndex(targetConfig.routingKey, envelope.getRoutingKey) match {
case Some(inputKeyIndex) =>
logger.debug(s"actual routing key ${envelope.getRoutingKey} matches source $inputKeyIndex")
confirmMessage(envelope.getDeliveryTag,
targetConfig.outputRoutingKeys(inputKeyIndex),
Option(properties).flatMap(p => Option(p.getMessageId)),
returnValue,
targetConfig.testingForceReplyId)
logger.info(s"MsgID ${properties.getMessageId} Successfully handled message")
case None =>
logger.error(s"No routing key input spec matched the actual key of ${envelope.getRoutingKey}! Configured input specs were ${targetConfig.routingKey}. Outputting to ${targetConfig.outputRoutingKeys.head}")
confirmMessage(envelope.getDeliveryTag,
targetConfig.outputRoutingKeys.head,
Option(properties).flatMap(p => Option(p.getMessageId)),
returnValue,
targetConfig.testingForceReplyId)
}
}).recover({
case err:SilentDropMessage=>
logger.info(s"Dropping message with id ${properties.getMessageId}: ${err.getMessage}")
channel.basicAck(envelope.getDeliveryTag, false)
case err:java.io.IOException=> //matrixstore errors get converted into IOException, which is not very useful for us :(
logger.error(s"MsgId ${properties.getMessageId} - Got java.io.IOException while trying to handle the message, retrying: ${err.getMessage}")
if(err.getMessage.contains("failed to retrieve work permit")) {
logger.error("Got work permit error, this indicates either MatrixStore busy or failure. Waiting for 4mins to reduce load.")
Thread.sleep(240000) //=240 seconds = 4mins
}
rejectMessage(envelope, Option(properties), msg)
case err:Throwable=>
logger.error(s"MsgID ${properties.getMessageId} - Got an exception ${err.getClass.getCanonicalName} while trying to handle the message: ${err.getMessage}", err)
permanentlyRejectMessage(envelope, properties, body, err.getMessage)
})
}
}
//Why are we using Await here, when you are not meant to use it in Scala production code? Well, the
//process seems to be executing multiple messages in parallel, which is not what we want and is causing issues
//when under load. So, to prevent the rabbitmq library from sending us another message immediately we block the thread
//here until we have definitively handled it.
//The Try here _should_ never fail as exceptions are handled in the block above.
Try { Await.ready(completionFuture, 420.minutes) } match {
case Success(_)=>
MDC.clear()
case Failure(err)=>
logger.error(s"HandleDeliver failed for message id ${properties.getMessageId}: ${err.getMessage}", err)
MDC.clear()
}
}