override def handleDelivery()

in common/src/main/scala/com/gu/multimedia/storagetier/framework/MessageProcessingFramework.scala [91:186]


    override def handleDelivery(consumerTag: String, envelope: Envelope, properties: AMQP.BasicProperties, body: Array[Byte]): Unit = {
      val retryAttempt = Try {
        properties.getHeaders.asScala.getOrElse("retry-count",0).asInstanceOf[Int]
      }.toOption.getOrElse(0)

      MDC.put("msgId", Option(properties.getMessageId).getOrElse(""))
      MDC.put("retryAttempt", retryAttempt.toString)
      MDC.put("routingKey", Option(envelope.getRoutingKey).getOrElse(""))
      MDC.put("exchange", Option(envelope.getExchange).getOrElse(""))
      MDC.put("isRedeliver", Option(envelope.isRedeliver).getOrElse(false).toString)

      val matchingConfigurations =
        if(envelope.getExchange==retryInputExchangeName) {  //if the message came from the retry exchange, then look up the original exchange and use that

          logger.debug(s"Message ${properties.getMessageId} is a retry, on attempt ${retryAttempt}")
          logger.debug(s"Message headers: ${properties.getHeaders}")

          if(retryAttempt>=maximumRetryLimit) {
            permanentlyRejectMessage(envelope, properties, body, "Too many retries")
            return
          }
          properties.getHeader[LongString]("x-original-exchange") match {
            case Some(effectiveExchange)=>
              logger.debug(s"Original exchange is $effectiveExchange, routing to that processor")
              handlers.filter(_.exchangeName==effectiveExchange.toString)
            case None=>
              logger.error(s"Could not determine the original exchange for retried message ${properties.getMessageId}")
              Seq()
          }
        } else {  //otherwise just use the exchange given by the envelope
          handlers.filter(_.exchangeName==envelope.getExchange)
        }

      logger.debug(s"${matchingConfigurations.length} processors matched, will use the first")

      val completionFuture = convertToUTFString(body).flatMap(wrappedParse) match {
        case Left(err)=>
          Future.fromTry(permanentlyRejectMessage(envelope, properties, body, err))
        case Right(msg)=>
          if(matchingConfigurations.isEmpty) {
            logger.error(s"No processors are configured to handle messages from ${envelope.getExchange}")
            Future.fromTry(rejectMessage(envelope, Some(properties), msg))
          } else {
            val targetConfig = matchingConfigurations.head
            targetConfig.processor.handleMessage(envelope.getRoutingKey, msg, MessageProcessingFramework.this).map({
              case Left(errDesc)=>
                logger.warn(s"MsgID ${properties.getMessageId} Retryable failure: \"$errDesc\"")
                rejectMessage(envelope, Option(properties), msg)
              case Right(returnValue)=>
                RoutingKeyMatcher.findMatchingIndex(targetConfig.routingKey, envelope.getRoutingKey) match {
                  case Some(inputKeyIndex) =>
                    logger.debug(s"actual routing key ${envelope.getRoutingKey} matches source $inputKeyIndex")
                    confirmMessage(envelope.getDeliveryTag,
                      targetConfig.outputRoutingKeys(inputKeyIndex),
                      Option(properties).flatMap(p => Option(p.getMessageId)),
                      returnValue,
                      targetConfig.testingForceReplyId)
                    logger.info(s"MsgID ${properties.getMessageId} Successfully handled message")
                  case None =>
                    logger.error(s"No routing key input spec matched the actual key of ${envelope.getRoutingKey}! Configured input specs were ${targetConfig.routingKey}. Outputting to ${targetConfig.outputRoutingKeys.head}")
                    confirmMessage(envelope.getDeliveryTag,
                      targetConfig.outputRoutingKeys.head,
                      Option(properties).flatMap(p => Option(p.getMessageId)),
                      returnValue,
                      targetConfig.testingForceReplyId)
                }
            }).recover({
              case err:SilentDropMessage=>
                logger.info(s"Dropping message with id ${properties.getMessageId}: ${err.getMessage}")
                channel.basicAck(envelope.getDeliveryTag, false)
              case err:java.io.IOException=>  //matrixstore errors get converted into IOException, which is not very useful for us :(
                logger.error(s"MsgId ${properties.getMessageId} - Got java.io.IOException while trying to handle the message, retrying: ${err.getMessage}")
                if(err.getMessage.contains("failed to retrieve work permit")) {
                  logger.error("Got work permit error, this indicates either MatrixStore busy or failure. Waiting for 4mins to reduce load.")
                  Thread.sleep(240000)  //=240 seconds = 4mins
                }
                rejectMessage(envelope, Option(properties), msg)
              case err:Throwable=>
                logger.error(s"MsgID ${properties.getMessageId} - Got an exception ${err.getClass.getCanonicalName} while trying to handle the message: ${err.getMessage}", err)
                permanentlyRejectMessage(envelope, properties, body, err.getMessage)
            })
          }
      }
      //Why are we using Await here, when you are not meant to use it in Scala production code? Well, the
      //process seems to be executing multiple messages in parallel, which is not what we want and is causing issues
      //when under load.  So, to prevent the rabbitmq library from sending us another message immediately we block the thread
      //here until we have definitively handled it.
      //The Try here _should_ never fail as exceptions are handled in the block above.
      Try { Await.ready(completionFuture, 420.minutes) } match {
        case Success(_)=>
          MDC.clear()
        case Failure(err)=>
          logger.error(s"HandleDeliver failed for message id ${properties.getMessageId}: ${err.getMessage}", err)
          MDC.clear()
      }
    }