in online_nearline/src/main/scala/VidispineMessageProcessor.scala [128:203]
def uploadIfRequiredAndNotExists(vault: Vault, absPath: String,
mediaIngested: QueryableItem): Future[Either[String, MessageProcessorReturnValue]] = {
logger.debug(s"uploadIfRequiredAndNotExists: Original file is $absPath")
val fullPath = Paths.get(absPath)
checkFileExists(fullPath)
val recordsFut = for {
maybeNearlineRecord <- nearlineRecordDAO.findBySourceFilename(absPath)
maybeUpdatedRecord <- if(maybeNearlineRecord.isEmpty) checkForPreExistingFiles(vault, fullPath, mediaIngested) else Future(maybeNearlineRecord)
maybeFailureRecord <- failureRecordDAO.findBySourceFilename(absPath)
} yield (maybeUpdatedRecord, maybeFailureRecord)
recordsFut.flatMap(result => {
val (maybeNearlineRecord, maybeFailureRecord) = result
val maybeObjectId = maybeNearlineRecord.map(rec => rec.objectId)
showPreviousFailure(maybeFailureRecord, absPath)
maybeNearlineRecord.foreach(rec=>MDC.put("correlationId", rec.correlationId))
fileCopier.copyFileToMatrixStore(vault, fullPath.getFileName.toString, fullPath)
.flatMap({
case Right(objectId) =>
val record = maybeNearlineRecord match {
case Some(rec) =>
rec
.copy(
objectId = objectId,
originalFilePath = fullPath.toString,
vidispineItemId = mediaIngested.itemId,
vidispineVersionId = mediaIngested.essenceVersion
)
case None =>
NearlineRecord(
None,
objectId = objectId,
originalFilePath = fullPath.toString,
vidispineItemId = mediaIngested.itemId,
vidispineVersionId = mediaIngested.essenceVersion,
None,
None,
correlationId = newCorrelationId
)
}
MDC.put("correlationId", record.correlationId)
for {
recId <- nearlineRecordDAO.writeRecord(record)
updatedRecord <- Future(record.copy(id=Some(recId)))
//ensure that Vidispine is updated with the MXS ID whenever the media changes
result <- VidispineHelper.updateVidispineWithMXSId(mediaIngested.itemId.get, updatedRecord)
} yield result
case Left(error) => Future(Left(error))
})
}).recoverWith({
case err:Throwable=>
logger.error(s"Can't copy to MXS: ${err.getMessage}", err)
val attemptCount = attemptCountFromMDC() match {
case Some(count)=>count
case None=>
logger.warn(s"Could not get attempt count from logging context for $fullPath, creating failure report with attempt 1")
1
}
val rec = FailureRecord(id = None,
originalFilePath = fullPath.toString,
attempt = attemptCount,
errorMessage = err.getMessage,
errorComponent = ErrorComponents.Internal,
retryState = RetryStates.WillRetry)
failureRecordDAO.writeRecord(rec).map(_=>Left(err.getMessage))
})
}