in media_remover/src/main/scala/MediaNotRequiredMessageProcessor.scala [47:90]
protected def newCorrelationId: String = UUID.randomUUID().toString
override def handleMessage(routingKey: String, msg: Json, framework: MessageProcessingFramework): Future[Either[String, MessageProcessorReturnValue]] =
routingKey match {
case "storagetier.restorer.media_not_required.nearline" =>
msg.as[OnlineOutputMessage] match {
case Left(err) =>
Future.failed(new RuntimeException(s"Could not unmarshal json message ${msg.noSpaces} into a OnlineOutputMessage: $err"))
case Right(onlineOutputMessageNearline) =>
mxsConfig.internalArchiveVaultId match {
case Some(internalArchiveVaultId) =>
matrixStoreBuilder.withVaultsFuture(Seq(mxsConfig.nearlineVaultId, internalArchiveVaultId)) { vaults =>
val nearlineVault = vaults.head
val internalArchiveVault = vaults(1)
handleNearlineMediaNotRequired(nearlineVault, internalArchiveVault, onlineOutputMessageNearline)
}
case None =>
logger.error(s"The internal archive vault ID has not been configured, so it's not possible to send an item to internal archive.")
Future.failed(new RuntimeException(s"Internal archive vault not configured"))
}
}
case "storagetier.restorer.media_not_required.online" =>
msg.as[OnlineOutputMessage] match {
case Left(err) =>
Future.failed(new RuntimeException(s"Could not unmarshal json message ${msg.noSpaces} into a OnlineOutputMessage: $err"))
case Right(onlineOutputMessageOnline) =>
mxsConfig.internalArchiveVaultId match {
case Some(internalArchiveVaultId) =>
matrixStoreBuilder.withVaultsFuture(Seq(mxsConfig.nearlineVaultId, internalArchiveVaultId)) { vaults =>
val nearlineVault = vaults.head
val internalArchiveVault = vaults(1)
handleOnlineMediaNotRequired(nearlineVault, internalArchiveVault, onlineOutputMessageOnline)
}
case None =>
logger.error(s"The internal archive vault ID has not been configured, so it's not possible to send an item to internal archive.")
Future.failed(new RuntimeException(s"Internal archive vault not configured"))
}
}
case _ =>
logger.warn(s"Dropping message $routingKey from project-restorer exchange as I don't know how to handle it.")
Future.failed(new RuntimeException(s"Routing key $routingKey dropped because I don't know how to handle it"))
}