in online_nearline/src/main/scala/VidispineMessageProcessor.scala [726:769]
override def handleMessage(routingKey: String, msg: Json, framework: MessageProcessingFramework): Future[Either[String, MessageProcessorReturnValue]] = {
logger.info(s"Received message from vidispine with routing key $routingKey")
(msg.as[VidispineMediaIngested], routingKey) match {
case (Left(err), _) =>
Future.failed(new RuntimeException(s"Could not unmarshal json message ${msg.noSpaces} into a VidispineMediaIngested: $err"))
case (Right(mediaIngested), "vidispine.job.raw_import.stop")=>
matrixStoreBuilder.withVaultFuture(mxsConfig.nearlineVaultId) { vault =>
handleIngestedMedia(vault, mediaIngested)
}
case (Right(mediaIngested), "vidispine.job.essence_version.stop")=>
matrixStoreBuilder.withVaultFuture(mxsConfig.nearlineVaultId) { vault =>
handleIngestedMedia(vault, mediaIngested)
}
case (Right(mediaIngested), "vidispine.item.metadata.modify")=>
handleMetadataUpdate(mediaIngested)
case (Right(mediaItemIdOnly), "vidispine.itemneedsbackup")=>
matrixStoreBuilder.withVaultFuture(mxsConfig.nearlineVaultId) { vault =>
handleVidispineItemNeedsBackup(vault, mediaItemIdOnly.itemId.get)
}
case (Right(shapeUpdate), "vidispine.item.shape.modify")=>
(shapeUpdate.shapeId, shapeUpdate.shapeTag, shapeUpdate.itemId) match {
case (None, _, _)=>
logger.error("Shape update without any shape ID?")
Future.failed(new RuntimeException(s"Received shape update ${msg.noSpaces} without any shapeId"))
case (_, None, _)=>
logger.error("Shape update without any shape tag?")
Future.failed(new RuntimeException(s"Received shape update ${msg.noSpaces} without any shapeTag"))
case (_, _, None)=>
logger.error("Shape update without any item ID")
Future.failed(new RuntimeException(s"Received shape update ${msg.noSpaces} without any itemId"))
case (_, Some("original"), _)=>
logger.info(s"Shape tag original handled by new file event, dropping message")
Future.failed(SilentDropMessage())
case (Some(shapeId), Some(_), Some(itemId))=>
matrixStoreBuilder.withVaultFuture(mxsConfig.nearlineVaultId) { vault =>
handleShapeUpdate(vault, shapeUpdate, shapeId, itemId)
}
}
case (_, _)=>
logger.warn(s"Dropping message $routingKey from vidispine exchange as I don't know how to handle it. This should be fixed in" +
s" the code.")
Future.failed(new RuntimeException(s"Routing key $routingKey dropped because I don't know how to handle it"))
}
}