in media_remover/src/main/scala/OnlineNearlineMessageProcessor.scala [35:88]
protected def newCorrelationId: String = UUID.randomUUID().toString
override def handleMessage(routingKey: String, msg: Json, framework: MessageProcessingFramework): Future[Either[String, MessageProcessorReturnValue]] =
routingKey match {
case "storagetier.nearline.internalarchive.nearline.success" =>
msg.as[NearlineRecord] match {
case Left(err) =>
Future.failed(new RuntimeException(s"Could not unmarshal json message ${msg.noSpaces} into a NearlineRecord: $err"))
case Right(nearlineRecord) =>
mxsConfig.internalArchiveVaultId match {
case Some(internalArchiveVaultId) =>
matrixStoreBuilder.withVaultsFuture(Seq(mxsConfig.nearlineVaultId, internalArchiveVaultId)) { vaults =>
val nearlineVault = vaults.head
val internalArchiveVault = vaults(1)
handleInternalArchiveCompleteForNearline(nearlineVault, internalArchiveVault, nearlineRecord)
}
case None =>
logger.error(s"The internal archive vault ID has not been configured, so it's not possible to check if item is backed-up on internal archive.")
Future.failed(new RuntimeException(s"Internal archive vault not configured"))
}
}
case "storagetier.nearline.newfile.success" =>
msg.as[NearlineRecord] match {
case Left(err) =>
Future.failed(new RuntimeException(s"Could not unmarshal json message ${msg.noSpaces} into a NearlineRecord: $err"))
case Right(nearlineRecord) =>
matrixStoreBuilder.withVaultFuture(mxsConfig.nearlineVaultId) { vault =>
handleNearlineCompleteForOnline(vault, nearlineRecord)
}
}
case "storagetier.nearline.internalarchive.online.success" =>
msg.as[NearlineRecord] match {
case Left(err) =>
Future.failed(new RuntimeException(s"Could not unmarshal json message ${msg.noSpaces} into a NearlineRecord: $err"))
case Right(nearlineRecord) =>
mxsConfig.internalArchiveVaultId match {
case Some(internalArchiveVaultId) =>
matrixStoreBuilder.withVaultFuture(internalArchiveVaultId) { internalArchiveVault =>
handleInternalArchiveCompleteForOnline(internalArchiveVault, nearlineRecord)
}
case None =>
logger.error(s"The internal archive vault ID has not been configured, so it's not possible to check if item is backed-up on internal archive.")
Future.failed(new RuntimeException(s"Internal archive vault not configured"))
}
}
case _ =>
logger.warn(s"Dropping message $routingKey from project-restorer exchange as I don't know how to handle it.")
Future.failed(new RuntimeException(s"Routing key $routingKey dropped because I don't know how to handle it"))
}