in online_nearline/src/main/scala/OwnMessageProcessor.scala [230:274]
protected def callCrossCopy(nearlineVault:Vault, sourceOID:String, destVault:Vault) = Copier
.doCrossCopy(nearlineVault, sourceOID, destVault)
def handleInternalArchiveRequested(msg: Json):Future[Either[String, MessageProcessorReturnValue]] = msg.as[NearlineRecord] match {
case Left(err)=>
Future.failed(new RuntimeException(s"Could not parse message as a nearline record: $err"))
case Right(rec)=>
MDC.put("correlationId", rec.correlationId)
mxsConfig.internalArchiveVaultId match {
case Some(internalArchiveVaultId) =>
//this message has been output by `applyCustomMetadata` above. So we can assume that (a) the source object ID in the message is
//valid, and (b) that it has the right metadata to copy from. All we need to do is to kick off the copy.
mxsConnectionBuilder.withVaultsFuture(Seq(mxsConfig.nearlineVaultId, internalArchiveVaultId)) { vaults =>
val nearlineVault = vaults.head
val internalArchiveVault = vaults(1)
isCopyNeeded(nearlineVault, internalArchiveVault, rec).flatMap({
case true =>
callCrossCopy(nearlineVault, rec.objectId, internalArchiveVault)
.flatMap(writtenOid => {
logger.info(s"Copied from ${rec.objectId} to $writtenOid for ${rec.originalFilePath}")
nearlineRecordDAO
.setInternallyArchived(rec.id.get, true)
.map({
case Some(updatedRec) =>
Right(updatedRec.asJson)
case None =>
throw new RuntimeException(s"Record id ${rec.id} is not valid!")
})
})
.recover({
case err: Throwable => //handle a copy error as a retryable failure, likelihood is that it's to do with appliance load.
logger.error(s"Could not copy entry ${rec.objectId} onto vault $internalArchiveVaultId: ${err.getMessage}", err)
Left(err.getMessage)
})
case false=>
logger.info(s"${rec.originalFilePath} already exists in the archive vault, no copy is needed")
Future.failed(SilentDropMessage(Some(s"${rec.originalFilePath} is already archived")))
})
}
case None=>
logger.error(s"The internal archive vault ID has not been configured, so it's not possible to send an item to internal archive.")
Future.failed(new RuntimeException(s"Internal archive vault not configured"))
}
}