app/services/FileMoveActor.scala (107 lines of code) (raw):

package services import java.time.ZonedDateTime import java.util.UUID import akka.actor.{Actor, ActorRef, ActorSystem, Props} import akka.stream.{ActorMaterializer, Materializer} import com.theguardian.multimedia.archivehunter.common.{Indexer, ProxyLocationDAO} import com.theguardian.multimedia.archivehunter.common.clientManagers.{DynamoClientManager, ESClientManager, S3ClientManager} import com.theguardian.multimedia.archivehunter.common.cmn_models.{JobModel, JobModelDAO, JobStatus, ScanTarget, SourceType} import javax.inject.{Inject, Singleton} import play.api.Configuration import services.FileMove.GenericMoveActor.MoveActorMessage import services.FileMove.{CopyMainFile, CopyProxyFiles, DeleteOriginalFiles, GenericMoveActor, ImprovedLargeFileCopier, UpdateIndexRecords, VerifyChecksum, VerifySource} import akka.pattern.ask import org.slf4j.LoggerFactory import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.util.{Failure, Success} //step one: verify file exists [VerifySource[ //step two: verify dest collection exists [Inline] //step three: gather proxies [VerifySource] //step four: copy file to new location [CopyMainFile] //step five: copy proxies to new location [CopyProxyFile] //step six: if all copies succeed, update index records [UpdateIndexRecords] //step seven: remove original files object FileMoveActor { case class MoveFile(sourceFileId:String, destination:ScanTarget, remoteMessageId:Option[String], queueActor:ActorRef) extends MoveActorMessage //replies case class MoveSuccess(sourceFileId:String, remoteMessageId:Option[String]) extends MoveActorMessage @deprecated case class MoveAsync(jobId:String) extends MoveActorMessage case class MoveFailed(sourceFileId:String, reason:String, remoteMessageId:Option[String]) extends MoveActorMessage } /** * this actor uses the same technique as Project Locker to run a step-function and roll back all successful stages if a * stage fails */ @Singleton class FileMoveActor @Inject() (config:Configuration, proxyLocationDAO: ProxyLocationDAO, esClientManager:ESClientManager, dynamoClientManager: DynamoClientManager, jobModelDAO: JobModelDAO, s3ClientManager: S3ClientManager, largeFileCopier: ImprovedLargeFileCopier)(implicit system:ActorSystem, mat:Materializer) extends Actor { import FileMoveActor._ import GenericMoveActor._ import services.FileMove.GenericMoveActor._ private val logger = LoggerFactory.getLogger(getClass) private implicit val esClient = esClientManager.getClient() private implicit val dynamoClient = dynamoClientManager.getNewAsyncDynamoClient(config.getOptional[String]("externalData.awsProfile")) val indexName = config.getOptional[String]("externalData.indexName").getOrElse("archivehunter") private val indexer = new Indexer(indexName) private implicit val timeout:akka.util.Timeout = 1200 seconds //time out after 20 minutes protected val fileMoveChain:Seq[ActorRef] = Seq( system.actorOf(Props(new VerifySource(indexer, proxyLocationDAO))), system.actorOf(Props(new CopyMainFile(s3ClientManager, config, largeFileCopier))), system.actorOf(Props(new VerifyChecksum(s3ClientManager, config))), system.actorOf(Props(new CopyProxyFiles(s3ClientManager, config))), system.actorOf(Props(new UpdateIndexRecords(indexer, proxyLocationDAO))), system.actorOf(Props(new DeleteOriginalFiles(s3ClientManager, indexer, config))) ) def runNextActorInChain(initialData:FileMoveTransientData, otherSteps:Seq[ActorRef]):Future[Either[StepFailed,StepSucceeded]] = { logger.debug(s"runNextActorInChain: remaining chain is $otherSteps") if(otherSteps.isEmpty) return Future(Right(StepSucceeded(initialData))) val nextActor = otherSteps.head logger.debug(s"Sending PerformStep to $nextActor") (nextActor ? GenericMoveActor.PerformStep(initialData) ).mapTo[MoveActorMessage].flatMap({ case successMsg:StepSucceeded=> logger.debug(s"Step succeeded, moving to next") runNextActorInChain(successMsg.updatedData,otherSteps.tail) flatMap { case Left(failedMessage)=> //if the _next_ step fails, tell _this_ step to roll back (nextActor ? GenericMoveActor.RollbackStep(successMsg.updatedData)).map(_=>Left(failedMessage)) //overwrite return value with the original failure case Right(nextActorSuccess)=> Future(Right(nextActorSuccess)) } case failedMessage:StepFailed=> //if the step fails, tell it to roll back logger.error(s"StepFailed, sending rollback to $nextActor") (nextActor ? RollbackStep(initialData)).map(_=>Left(failedMessage)) case other:Any => logger.warn(s"got unexpected message: ${other.getClass}") Future(Left(StepFailed(initialData,"got unexpected message"))) }) } override def receive:Receive = { case MoveFile(sourceFileId, destination, remoteMessageId, queueActor)=> val originalSender = sender() val newJob = JobModel.newJob("FileMove",sourceFileId,SourceType.SRC_MEDIA).copy(jobStatus = JobStatus.ST_RUNNING) jobModelDAO .putJob(newJob) .map(_=>{ val setupData = FileMoveTransientData.initialise(sourceFileId, destination.bucketName, destination.proxyBucket, destination.region) logger.info(s"Setting up file move with initial data $setupData") runNextActorInChain(setupData, fileMoveChain).map({ case Right(_) => logger.info(s"File move for $sourceFileId -> ${destination.bucketName} completed successfully") val finalJob = newJob.copy(jobStatus = JobStatus.ST_SUCCESS, completedAt = Some(ZonedDateTime.now())) jobModelDAO.putJob(finalJob) queueActor ! MoveSuccess(sourceFileId,remoteMessageId) case Left(errMsg) => logger.error(s"File move for $sourceFileId -> ${destination.bucketName} failed: ${errMsg.err}") val finalJob = newJob.copy(jobStatus = JobStatus.ST_ERROR, log=Some(errMsg.err), completedAt = Some(ZonedDateTime.now())) jobModelDAO.putJob(finalJob) queueActor ! MoveFailed(sourceFileId, errMsg.err, remoteMessageId) }).recover({ case err: Throwable => val finalJob = newJob.copy(jobStatus = JobStatus.ST_ERROR, log=Some(err.toString), completedAt = Some(ZonedDateTime.now())) jobModelDAO.putJob(finalJob) logger.error(s"File move processor crashed: ", err) queueActor ! MoveFailed(sourceFileId, err.getMessage, remoteMessageId) }) }).recover({ case err:Throwable=> logger.error(s"Could not move file with id $sourceFileId to destination $destination: ${err.getMessage}", err) originalSender ! MoveFailed(sourceFileId, err.toString, remoteMessageId) }) } }