app/services/FileMove/DeleteOriginalFiles.scala (117 lines of code) (raw):

package services.FileMove import com.theguardian.multimedia.archivehunter.common.clientManagers.S3ClientManager import com.theguardian.multimedia.archivehunter.common.{DocId, Indexer, ProxyLocation} import play.api.Configuration import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.s3.S3Client import software.amazon.awssdk.services.s3.model.{DeleteObjectRequest, NoSuchKeyException} import scala.concurrent.Future import scala.util.{Failure, Success, Try} import scala.concurrent.ExecutionContext.Implicits.global /** * final step, delete the original files from the disk(s) * @param s3ClientMgr An instance of [[S3ClientManager]] to obtain S3 client objects for the relevant region * @param indexer an instance of [[Indexer]] to access content in the main index */ class DeleteOriginalFiles(s3ClientMgr:S3ClientManager, indexer:Indexer, config:Configuration) extends GenericMoveActor with DocId { import GenericMoveActor._ import com.theguardian.multimedia.archivehunter.common.cmn_helpers.S3ClientExtensions._ /** * verify that the object pointed to by (destBucket, destPath) is indeed an accurate copy of (srcBucket, srcPath). * this is an internal method which is called by `verifyNewMedia` and `verifyProxyFiles` * @param srcBucket bucket containing the source file * @param srcPath path to the source file * @param destBucket bucket containing the destination file * @param destPath path to the destination file * @return either a message explaining why the files don't match (one does not exist, size mistmatch, checksum mismatch) or a Right with no contents if they do */ protected def verifyFile(srcBucket:String, srcPath:String, destBucket:String, destPath:String, sourceClient:S3Client, destClient:S3Client):Either[String, Unit] = { (destClient.getObjectMetadata(destBucket, destPath, None), sourceClient.getObjectMetadata(srcBucket, srcPath, None)) match { case (Failure(err:NoSuchKeyException), _)=> Left(s"Destination file s3://$destBucket/$destPath does not exist") case (_, Failure(err:NoSuchKeyException))=> Left(s"Source file s3://$srcBucket/$srcPath does not exist") case (Success(destFileMeta), Success(srcFileMeta))=> if(destFileMeta.contentLength()!=srcFileMeta.contentLength()){ return Left(s"Destination size was ${destFileMeta.contentLength()} vs ${srcFileMeta.contentLength()}") } Right( () ) case (Failure(err:Throwable), _)=> Left(err.getMessage) case (_, Failure(err:Throwable))=> Left(err.getMessage) } } /** * verify that the new file is in place and that its size and checksum match the source * @param state [[FileMoveTransientData]] giving the state of the move * @return either a string indicating why the file is not good or the Unit value */ def verifyNewMedia(state:FileMoveTransientData, sourceClient:S3Client, destClient:S3Client):Either[String,Unit] = { val archiveEntry = state.entry.get verifyFile(archiveEntry.bucket, archiveEntry.path, state.destBucket, archiveEntry.path, sourceClient, destClient) } /** * verify that each new proxy file listed in the state is in place and that its size and checksum match the source * @param state [[FileMoveTransientData]] giving the state of the move * @param s3Client implicitly provided s3Client for the correct region * @return either a string indicating why the file is not good or the Unit value */ def verifyProxyFiles(state:FileMoveTransientData, sourceClient:S3Client, destClient:S3Client):Either[String,Unit] = { if (state.sourceFileProxies.isEmpty) { logger.warn(s"No source file proxies to verify") return Right(()) } if (state.destFileProxy.isEmpty) { logger.error(s"Have ${state.sourceFileProxies.get.length} source proxies but no destination proxies defined") return Left(s"Have ${state.sourceFileProxies.get.length} source proxies but no destination proxies defined") } val srcProxies = state.sourceFileProxies.get val dstProxies = state.destFileProxy.get if (srcProxies.length != dstProxies.length) { return Left(s"Source had ${srcProxies.length} proxies but destination has ${dstProxies.length}") } /** * tail-recursively iterate the proxy lists and verify that all are present and correct * this assumes that srcProxyList and dstProxyList are in-sync * @param srcProxyList list of ProxyLocation for the source item * @param dstProxyList list of ProxyLocation for the destination * @return Right if both srcProxyList and dstProxyList match, otherwise Left with a description */ def recursiveVerify(srcProxyList: Seq[ProxyLocation], dstProxyList: Seq[ProxyLocation]): Either[String, Unit] = { if (srcProxyList.isEmpty) return Right(()) verifyFile(srcProxyList.head.bucketName, srcProxyList.head.bucketPath, dstProxyList.head.bucketName, dstProxyList.head.bucketPath, sourceClient, destClient) match { case Right(_) => recursiveVerify(srcProxyList.tail, dstProxyList.tail) case problem@Left(_) => problem } } recursiveVerify(srcProxies, dstProxies) } /** * Delete the files in parallel. Contained in a Try to make it easier to catch _every_ failure not * just one * @param bucket bucket to delete from * @param path to the file to delete * @param s3Client implicitly provided S3 Client * @return a Future, containing a Try with either the result of the delete operation or an error */ def tryToDelete(bucket:String, path:String)(implicit s3Client:S3Client) = Future { Try { s3Client.deleteObject(DeleteObjectRequest.builder().bucket(bucket).key(path).build) } } def deleteAllFor(state:FileMoveTransientData)(implicit s3Client:S3Client) = { val archiveEntry = state.entry.get val proxiesList = state.sourceFileProxies.getOrElse(Seq()).map(prx=>(prx.bucketName, prx.bucketPath)) val allFilesList = proxiesList :+ (archiveEntry.bucket, archiveEntry.path) val completionFuture = Future.sequence(allFilesList.map(bucketpath=>tryToDelete(bucketpath._1, bucketpath._2))) completionFuture.map(results=>{ val failures = results.collect({case Failure(err)=>err}) if(failures.nonEmpty){ logger.error(s"${failures.length} files failed to delete: ") failures.foreach(err=>logger.error(s"\tFailed to delete: ", err)) Left(failures.head.getMessage) } else { logger.info(s"Deleted ${results.length} files") Right( () ) } }) } override def receive: Receive = { case PerformStep(state)=> //verify new media and proxy files. only if both match do the deletion try { val sourceClient:S3Client = s3ClientMgr.getS3Client( profileName=config.getOptional[String]("externalData.awsProfile"), region=state.entry.flatMap(_.region).map(Region.of)) val destClient:S3Client = s3ClientMgr.getS3Client( profileName=config.getOptional[String]("externalData.awsProfile"), region=Some(state.destRegion).map(Region.of)) verifyNewMedia(state, sourceClient, destClient).map(_=>verifyProxyFiles(state, sourceClient, destClient)) match { case Left(problem)=> sender() ! StepFailed(state, problem) case Right(_)=> //media and proxies both verified, can proceed to deletion. val originalSender = sender() deleteAllFor(state)(sourceClient).onComplete({ case Success(Right(_))=> logger.info(s"Deletion completed") val updatedState = state.copy(sourceFileProxies = None) originalSender ! StepSucceeded(updatedState) case Success(Left(err))=> logger.error(s"Some or all deletions failed") originalSender ! StepFailed(state, err) case Failure(err)=> logger.error(s"Deletion thread(s) failed: ", err) originalSender ! StepFailed(state, err.getMessage) }) } } catch { case err:Throwable=> logger.error(s"Deletion of original files failed: ${err.getClass.getCanonicalName} ${err.getMessage}", err) sender() ! StepFailed(state, err.getMessage) } case RollbackStep(state)=> logger.error(s"Can't roll back file deletion!") sender() ! StepFailed(state, "Can't roll back file deletion!") } }