in online_archive/src/main/scala/VidispineMessageProcessor.scala [335:369]
override def handleMessage(routingKey: String, msg: Json, framework: MessageProcessingFramework): Future[Either[String, MessageProcessorReturnValue]] = {
logger.info(s"Received message from vidispine with routing key $routingKey")
(msg.as[VidispineMediaIngested], routingKey) match {
case (Left(err), _) =>
Future.failed(new RuntimeException(s"Could not unmarshal json message ${msg.noSpaces} into a VidispineMediaIngested: $err"))
case (Right(mediaIngested), "vidispine.job.essence_version.stop")=>
handleIngestedMedia(mediaIngested)
case (Right(mediaIngested), "vidispine.job.raw_import.stop")=>
handleIngestedMedia(mediaIngested)
case (Right(mediaItemIdOnly), "vidispine.itemneedsarchive.nearline") =>
handleVidispineItemNeedsArchive(mediaItemIdOnly)
case (Right(mediaItemIdOnly), "vidispine.itemneedsarchive.online") =>
handleVidispineItemNeedsArchive(mediaItemIdOnly)
case (Right(shapeUpdate), "vidispine.item.shape.modify")=>
(shapeUpdate.shapeId, shapeUpdate.shapeTag, shapeUpdate.itemId) match {
case (None, _, _)=>
logger.error("Shape update without any shape ID?")
Future.failed(new RuntimeException(s"Received shape update ${msg.noSpaces} without any shapeId"))
case (_, None, _)=>
logger.error("Shape update without any shape tag?")
Future.failed(new RuntimeException(s"Received shape update ${msg.noSpaces} without any shapeTag"))
case (_, _, None)=>
logger.error("Shape update without any item ID")
Future.failed(new RuntimeException(s"Received shape update ${msg.noSpaces} without any itemId"))
case (Some(shapeId), Some(shapeTag), Some(itemId))=>
handleShapeUpdate(shapeId, shapeTag, itemId)
}
case (Right(updatedMetadata), "vidispine.item.metadata.modify")=>
handleMetadataUpdate(msg, updatedMetadata)
case (_, _)=>
logger.warn(s"Dropping message $routingKey from vidispine exchange as I don't know how to handle it. This should be fixed in" +
s" the code.")
Future.failed(new RuntimeException("Not meant to receive this"))
}
}