protected def callCrossCopy()

in online_nearline/src/main/scala/OwnMessageProcessor.scala [230:274]


  protected def callCrossCopy(nearlineVault:Vault, sourceOID:String, destVault:Vault) = Copier
    .doCrossCopy(nearlineVault, sourceOID, destVault)

  def handleInternalArchiveRequested(msg: Json):Future[Either[String, MessageProcessorReturnValue]] = msg.as[NearlineRecord] match {
    case Left(err)=>
      Future.failed(new RuntimeException(s"Could not parse message as a nearline record: $err"))
    case Right(rec)=>
      MDC.put("correlationId", rec.correlationId)
      mxsConfig.internalArchiveVaultId match {
        case Some(internalArchiveVaultId) =>
          //this message has been output by `applyCustomMetadata` above. So we can assume that (a) the source object ID in the message is
          //valid, and (b) that it has the right metadata to copy from. All we need to do is to kick off the copy.
          mxsConnectionBuilder.withVaultsFuture(Seq(mxsConfig.nearlineVaultId, internalArchiveVaultId)) { vaults =>
            val nearlineVault = vaults.head
            val internalArchiveVault = vaults(1)

            isCopyNeeded(nearlineVault, internalArchiveVault, rec).flatMap({
              case true =>
                callCrossCopy(nearlineVault, rec.objectId, internalArchiveVault)
                  .flatMap(writtenOid => {
                    logger.info(s"Copied from ${rec.objectId} to $writtenOid for ${rec.originalFilePath}")
                    nearlineRecordDAO
                      .setInternallyArchived(rec.id.get, true)
                      .map({
                        case Some(updatedRec) =>
                          Right(updatedRec.asJson)
                        case None =>
                          throw new RuntimeException(s"Record id ${rec.id} is not valid!")
                      })
                  })
                  .recover({
                    case err: Throwable => //handle a copy error as a retryable failure, likelihood is that it's to do with appliance load.
                      logger.error(s"Could not copy entry ${rec.objectId} onto vault $internalArchiveVaultId: ${err.getMessage}", err)
                      Left(err.getMessage)
                  })
              case false=>
                logger.info(s"${rec.originalFilePath} already exists in the archive vault, no copy is needed")
                Future.failed(SilentDropMessage(Some(s"${rec.originalFilePath} is already archived")))
            })
          }
        case None=>
          logger.error(s"The internal archive vault ID has not been configured, so it's not possible to send an item to internal archive.")
          Future.failed(new RuntimeException(s"Internal archive vault not configured"))
      }
  }