in online_nearline/src/main/scala/VidispineMessageProcessor.scala [638:714]
def handleMetadataUpdate(metadataUpdated:VidispineMediaIngested):Future[Either[String, MessageProcessorReturnValue]] = {
metadataUpdated.itemId match {
case None=>
logger.error(s"Incoming vidispine message $metadataUpdated has no itemId")
Future.failed(new RuntimeException("no vidispine item id provided"))
case Some(itemId)=>
matrixStoreBuilder.withVaultFuture(mxsConfig.nearlineVaultId) { vault=>
nearlineRecordDAO
.findByVidispineId(itemId)
.flatMap({
case foundRecord@Some(rec)=>
MDC.put("correlationId", rec.correlationId)
Future(foundRecord) //we found a record in the database, happy days
case None=> //we didn't find a record in the database, but maybe the file does exist already?
val maybeNewEntryList = for {
files <- getOriginalFilesForItem(itemId)
filePaths <- Future(files.map(_.path).map(Paths.get(_)))
possibleEntries <- {
logger.info(s"Checking for existing files for vidispine ID $itemId. File path list is $filePaths.")
Future.sequence(filePaths.map(path=>checkForPreExistingFiles(vault, path, metadataUpdated, shouldSave = false)))
}
} yield possibleEntries
maybeNewEntryList
.map(_.collect({case Some(newRec)=>newRec}))
.map(_.headOption)
.flatMap({
case Some(newRec)=>
//we got a record. Write it to the database and continue.
logger.info(s"Found existing file at ${newRec.originalFilePath} with OID ${newRec.objectId} for vidispine item ${newRec.vidispineItemId}")
nearlineRecordDAO
.writeRecord(newRec)
.map(recordId=>newRec.copy(id=Some(recordId)))
.map(Some(_))
case None=>
logger.info(s"No files found in the nearline storage for vidispine item ID $itemId")
Future(None)
})
.recover({
case err:Throwable=>
logger.error(s"Could not search for pre-existing files for vidispine item $itemId: ${err.getMessage}", err)
None //this will cause a retryable failure to be logged out below.
})
})
.flatMap({
case None=>
logger.info(s"No record of vidispine item $itemId yet.")
Future(Left(s"No record of vidispine item $itemId yet.")) //this is retryable, assume that the item has not finished importing yet
case Some(nearlineRecord: NearlineRecord)=>
MDC.put("correlationId", nearlineRecord.correlationId)
buildMetaForXML(vault, nearlineRecord, itemId).flatMap({
case None=>
//this is a retryable failure; sometimes the "updated metadata" message will arrive earlier than the media has finished processing.
logger.info(s"The object ${nearlineRecord.objectId} for file ${nearlineRecord.originalFilePath} does not have GNM compatible metadata attached to it yet. This can mean that ingest or extraction is still in progress.")
Future(Left(s"Object ${nearlineRecord.objectId} does not have GNM compatible metadata"))
case Some(updatedMetadata)=>
streamVidispineMeta(vault, itemId, updatedMetadata).flatMap({
case Right((copiedId, maybeChecksum))=>
updateParentsMetadata(vault, nearlineRecord.objectId, "ATT_META_OID", copiedId) match {
case Success(_) =>
case Failure(err)=>
//this is not a fatal error.
logger.warn(s"Could not update metadata on ${nearlineRecord.objectId} to set metadata attachment: ${err.getMessage}")
}
logger.info(s"Metadata xml for $itemId is copied to file $copiedId with checksum ${maybeChecksum.getOrElse("(none)")}")
val updatedRec = nearlineRecord.copy(metadataXMLObjectId = Some(copiedId))
nearlineRecordDAO
.writeRecord(updatedRec)
.map(_=>Right(updatedRec.asJson))
case Left(err)=>Future(Left(err))
})
})
})
}
}
}