def uploadIfRequiredAndNotExists()

in online_nearline/src/main/scala/VidispineMessageProcessor.scala [128:203]


  def uploadIfRequiredAndNotExists(vault: Vault, absPath: String,
                                   mediaIngested: QueryableItem): Future[Either[String, MessageProcessorReturnValue]] = {
    logger.debug(s"uploadIfRequiredAndNotExists: Original file is $absPath")

    val fullPath = Paths.get(absPath)
    checkFileExists(fullPath)

    val recordsFut = for {
      maybeNearlineRecord <- nearlineRecordDAO.findBySourceFilename(absPath)
      maybeUpdatedRecord <- if(maybeNearlineRecord.isEmpty) checkForPreExistingFiles(vault, fullPath, mediaIngested) else Future(maybeNearlineRecord)
      maybeFailureRecord <- failureRecordDAO.findBySourceFilename(absPath)
    } yield (maybeUpdatedRecord, maybeFailureRecord)

    recordsFut.flatMap(result => {
      val (maybeNearlineRecord, maybeFailureRecord) = result
      val maybeObjectId = maybeNearlineRecord.map(rec => rec.objectId)

      showPreviousFailure(maybeFailureRecord, absPath)

      maybeNearlineRecord.foreach(rec=>MDC.put("correlationId", rec.correlationId))

      fileCopier.copyFileToMatrixStore(vault, fullPath.getFileName.toString, fullPath)
        .flatMap({
          case Right(objectId) =>
            val record = maybeNearlineRecord match {
              case Some(rec) =>
                rec
                  .copy(
                    objectId = objectId,
                    originalFilePath = fullPath.toString,
                    vidispineItemId = mediaIngested.itemId,
                    vidispineVersionId = mediaIngested.essenceVersion
                  )
              case None =>
                NearlineRecord(
                  None,
                  objectId = objectId,
                  originalFilePath = fullPath.toString,
                  vidispineItemId = mediaIngested.itemId,
                  vidispineVersionId = mediaIngested.essenceVersion,
                  None,
                  None,
                  correlationId = newCorrelationId
                )
            }
            MDC.put("correlationId", record.correlationId)

            for {
              recId <- nearlineRecordDAO.writeRecord(record)
              updatedRecord <- Future(record.copy(id=Some(recId)))
              //ensure that Vidispine is updated with the MXS ID whenever the media changes
              result <- VidispineHelper.updateVidispineWithMXSId(mediaIngested.itemId.get, updatedRecord)
            } yield result

          case Left(error) => Future(Left(error))
        })
    }).recoverWith({
      case err:Throwable=>
        logger.error(s"Can't copy to MXS: ${err.getMessage}", err)

        val attemptCount = attemptCountFromMDC() match {
          case Some(count)=>count
          case None=>
            logger.warn(s"Could not get attempt count from logging context for $fullPath, creating failure report with attempt 1")
            1
        }

        val rec = FailureRecord(id = None,
          originalFilePath = fullPath.toString,
          attempt = attemptCount,
          errorMessage = err.getMessage,
          errorComponent = ErrorComponents.Internal,
          retryState = RetryStates.WillRetry)
        failureRecordDAO.writeRecord(rec).map(_=>Left(err.getMessage))
    })
  }