app/services/FileMove/CopyMainFile.scala (96 lines of code) (raw):
package services.FileMove
import akka.actor.ActorSystem
import akka.stream.Materializer
import com.amazonaws.regions.Regions
import com.theguardian.multimedia.archivehunter.common.DocId
import com.theguardian.multimedia.archivehunter.common.clientManagers.S3ClientManager
import play.api.Configuration
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.S3Client
import software.amazon.awssdk.services.s3.model.{CopyObjectRequest, DeleteObjectRequest}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}
import javax.inject.Singleton
/**
* this actor copies a file to the requested destination bucket and updates the internal state with the new file ID.
* when rolling back, it checks that the source file still exists and if so deletes the one it copied earlier.
*/
@Singleton
class CopyMainFile (s3ClientManager: S3ClientManager, config:Configuration, largeFileCopier:ImprovedLargeFileCopier)
(implicit val actorSystem: ActorSystem, mat:Materializer) extends GenericMoveActor with DocId {
import GenericMoveActor._
import com.theguardian.multimedia.archivehunter.common.cmn_helpers.S3ClientExtensions._
/**
* Request a standard S3 bucket->bucket copy. This only works on files less than 5Gb in size; for larger ones you
* need to download and re-upload - this is done via streaming in `largeFileCopy`. In order to maintain compatibility
* between these two implementations, the return value is a `MultipartUploadResult` even though multi-part is not used here.
*
* @param destBucket bucket to copy into
* @param sourceBucket bucket to copy from
* @param path path of the file to copy
* @param s3Client implicitly provided S3 client object
* @return a Future, containing a MultipartUploadResult which fails on error. This method is, however, synchronous under the hood until
* updated to AWS SDK v2
*/
def standardS3Copy(destBucket:String, sourceBucket:String, path:String)(implicit s3Client:S3Client) =
Future.fromTry(Try {
logger.info(s"Copying ${sourceBucket}:${path} to ${destBucket}:${path}")
val result = s3Client.copyObject(CopyObjectRequest.builder()
.sourceBucket(sourceBucket)
.sourceKey(path)
.destinationBucket(destBucket)
.destinationKey(path)
.build())
logger.info("Copy succeeded")
ImprovedLargeFileCopier.CompletedUpload(s"s3://$destBucket/$path", destBucket, path, result.copyObjectResult().eTag(), None, None, None, None)
})
val maybeProfile = config.getOptional[String]("externalData.awsProfile")
override def receive: Receive = {
case PerformStep(currentState)=>
implicit val s3Client = s3ClientManager.getS3Client(region=Some(Region.of(currentState.destRegion)),profileName=maybeProfile)
currentState.entry match {
case None=>
sender() ! StepFailed(currentState, "No archive entry source")
case Some(entry)=>
val originalSender = sender()
val copyFuture = if(entry.size<5368709120L) { //5gb and larger files can't be directly copied and must be re-uploaded
standardS3Copy(currentState.destBucket, entry.bucket, entry.path)
} else {
val rgn = entry.region.map(Regions.fromName).getOrElse(Regions.EU_WEST_1)
largeFileCopier.performCopy(rgn,
Some(s3ClientManager.newCredentialsProvider(maybeProfile)),
entry.bucket, entry.path, None, currentState.destBucket, entry.path)
}
copyFuture.onComplete({
case Success(result)=>
logger.info(s"Successfully copied ${entry.path} to s3://${result.bucket}/${result.key}")
val updatedState = currentState.copy(destFileId = Some(makeDocId(currentState.destBucket, entry.path)))
originalSender ! StepSucceeded(updatedState)
case Failure(err)=>
logger.error(s"Could not copy s3://${entry.bucket}/${entry.path} to s3://${currentState.destBucket}/${entry.path}: ${err.getMessage}", err)
originalSender ! StepFailed(currentState, err.getMessage)
})
}
case RollbackStep(currentState)=>
val destClient = s3ClientManager.getS3Client(region=Some(Region.of(currentState.destRegion)),profileName=config.getOptional[String]("externalData.awsProfile"))
val sourceClient = s3ClientManager.getS3Client(region=currentState.entry.flatMap(_.region).map(Region.of),profileName=config.getOptional[String]("externalData.awsProfile"))
currentState.entry match {
case None=>
sender() ! StepFailed(currentState, "No archive entry source")
case Some(entry)=>
val originalSender = sender()
logger.info(s"Rolling back failed file move, going to delete ${currentState.destBucket}:${entry.path} if ${entry.bucket}:${entry.path} exists")
val copyBackFuture = if(!sourceClient.doesObjectExist(entry.bucket, entry.path).get){
//if the file no longer exists in the source bucket, then copy it back from the destination
if(entry.size<5368709120L) {
logger.info(s"File no longer exists on s3://${entry.bucket}/${entry.path}, copying it back with standard copy...")
standardS3Copy(entry.bucket, currentState.destBucket, entry.path)(destClient).map(result=>Some(result))
} else {
logger.info(s"File no longer exists on s3://${entry.bucket}/${entry.path}, copying it back with large-file copy...")
val rgn = entry.region.map(Regions.fromName).getOrElse(Regions.EU_WEST_1)
largeFileCopier.performCopy(rgn,
Some(s3ClientManager.newCredentialsProvider(maybeProfile)),
entry.bucket, entry.path, None, currentState.destBucket, entry.path)
}
} else {
logger.info(s"File already exists on s3://${entry.bucket}/${entry.path}, no copy-back required")
Future(None)
}
val resultFut = for {
_ <- copyBackFuture
deleteResult <- Future.fromTry(Try { destClient.deleteObject(DeleteObjectRequest.builder().bucket(currentState.destBucket).key(entry.path).build()) })
} yield deleteResult
resultFut.onComplete({
case Success(_)=>
originalSender ! StepSucceeded(currentState.copy(destFileId = None))
case Failure(err)=>
logger.error(s"Could not rollback copy for $entry: ${err.getMessage}", err)
originalSender ! StepFailed(currentState, err.toString)
})
}
}
}