in online_archive/src/main/scala/OwnMessageProcessor.scala [238:261]
override def handleMessage(routingKey: String, msg: Json, framework: MessageProcessingFramework): Future[Either[String, MessageProcessorReturnValue]] = {
routingKey match {
case r if r == "storagetier.onlinearchive.newfile.success" || r == "storagetier.onlinearchive.newfile.nearline.success" || r == "storagetier.onlinearchive.newfile.online.success" =>
msg.as[ArchivedRecord] match {
case Left(err)=>
if(isIgnoredRecord(msg)) {
Future.failed(SilentDropMessage(Some(s"Incoming message indicates file ignored, dropping it.")))
} else {
Future.failed(new RuntimeException(s"Could not unmarshal json message ${msg.noSpaces} into an ArchivedRecord: $err"))
}
case Right(file)=>
handleArchivehunterValidation(file)
.map(_.map(_.asJson))
}
case "storagetier.onlinearchive.replay.success"=>
handleReplayStageTwo(msg)
case "storagetier.onlinearchive.request.archivehunter-revalidation"=>
handleRevalidationList(msg)
case _=>
logger.warn(s"Dropping message $routingKey from own exchange as I don't know how to handle it. This should be fixed in the code.")
Future.failed(new RuntimeException("Not meant to receive this"))
}
}