in media_remover/src/main/scala/OnlineArchiveMessageProcessor.scala [98:145]
def handleDeepArchiveCompleteForNearline(vault: Vault, archivedRecord: ArchivedRecord): Future[Either[String, MessageProcessorReturnValue]] = {
// FIXME It seems we do not have nearlineId in an archiveRecord, and as we've previously noticed, originalFilePath can be the same for more than one nearline media item
// TODO Alright, if we get a AR for a nearline item, fetch all nearline pending records for that originalFilePath.. and.. what.. check all?
// Search for originalFilePath AND uploadedPath. If either of those matches, use the the found PendingDeletionRecord as source.
// OR NOT. I think we need to use originalFilePath to find the correct PDR, but use uploadedPath when checking against S3...
// WHICH should work fine for ArchivedRecord
findPendingDeletionRecords(archivedRecord).flatMap(recs => recs.length match {
case 0 =>
logger.debug(s"ignoring archive confirmation, no pending deletion for this ${MediaTiers.NEARLINE} item with ${archivedRecord.originalFilePath}")
throw SilentDropMessage(Some(s"ignoring archive confirmation, no pending deletion for this ${MediaTiers.NEARLINE} item with ${archivedRecord.originalFilePath}"))
case 1 =>
val rec = recs.head
rec.nearlineId match {
case Some(nearlineId) => getNearlineDataAndCheckExistsInDeepArchive(vault, archivedRecord, rec, nearlineId).flatMap({
case true => handleNearlineDeletion(vault, rec)
case false => retryRequireCopyOrFail(rec, nearlineId)
}).recover({
// We only want to transform S3 connection errors to Lefts
case err: Throwable if !err.getMessage.startsWith("Cannot request deep archive copy") =>
logger.info(s"Could not connect to deep archive to check if copy of ${MediaTiers.NEARLINE} media exists, do not delete. Reason: ${err.getMessage}")
Left(s"Could not connect to deep archive to check if copy of ${MediaTiers.NEARLINE} media exists, do not delete. Reason: ${err.getMessage}")
})
case None =>
logger.warn(s"No nearline ID in pending deletion rec for media ${archivedRecord.originalFilePath}")
Future.failed(new RuntimeException(s"No nearline ID in pending deletion rec for media ${archivedRecord.originalFilePath}"))
}
case _ =>
val recsWithNearlineId = recs.collect({ case rec if rec.nearlineId.isDefined => rec })
if (recsWithNearlineId.isEmpty) {
retryRequireCopyOrFail(archivedRecord, recs)
} else {
val listOfExistsRecTupleFuts = recsWithNearlineId.map(rec => getNearlineDataAndCheckExistsInDeepArchive(vault, archivedRecord, rec, rec.nearlineId.get).zip(Future(rec)))
Future.sequence(listOfExistsRecTupleFuts).flatMap(_.filter(_._1) match {
case listOfExistsRecTuples if listOfExistsRecTuples.nonEmpty => handleNearlineDeletion(vault, listOfExistsRecTuples.head)
case _ => retryRequireCopyOrFail(archivedRecord, recs)
}).recover({
// We only want to transform S3 connection errors to Lefts
case err: Throwable if !err.getMessage.startsWith("Cannot request deep archive copy") =>
logger.info(s"Could not connect to deep archive to check if copy of ${MediaTiers.NEARLINE} media exists, do not delete. Reason: ${err.getMessage}")
Left(s"Could not connect to deep archive to check if copy of ${MediaTiers.NEARLINE} media exists, do not delete. Reason: ${err.getMessage}")
})
}
})
}