app/services/GlacierRestoreActor.scala (233 lines of code) (raw):

package services import java.time.temporal.{ChronoUnit, TemporalField} import java.time.{Instant, ZoneId, ZonedDateTime} import akka.actor.{Actor, ActorRef, ActorSystem} import com.theguardian.multimedia.archivehunter.common.{ArchiveEntry, Indexer} import com.theguardian.multimedia.archivehunter.common.clientManagers.{ESClientManager, S3ClientManager} import com.theguardian.multimedia.archivehunter.common.cmn_helpers.S3RestoreHeader import com.theguardian.multimedia.archivehunter.common.cmn_models._ import javax.inject.{Inject, Singleton} import play.api.{Configuration, Logger} import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.s3.S3Client import software.amazon.awssdk.services.s3.model.{ArchiveStatus, HeadObjectResponse, RestoreObjectRequest, RestoreRequest, StorageClass} import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success, Try} import scala.concurrent.duration._ object GlacierRestoreActor { trait GRMsg /** * public messages to send * @param bucket * @param filePath */ case class InitiateRestore(entry:ArchiveEntry, lbEntry:LightboxEntry, expiration:Option[Int]) extends GRMsg case class InitiateRestoreBasic(entry:ArchiveEntry, expiration:Option[Int]) extends GRMsg case class CheckRestoreStatus(lbEntry:LightboxEntry) extends GRMsg case class CheckRestoreStatusBasic(archiveEntry: ArchiveEntry) extends GRMsg /* private internal messages */ case class InternalCheckRestoreStatus(lbEntry:Option[LightboxEntry], entry:ArchiveEntry, jobsList:Option[List[JobModel]], originalSender:ActorRef) /* reply messages to expect */ case object RestoreSuccess extends GRMsg case class RestoreFailure(err:Throwable) extends GRMsg case class NotInArchive(entry:ArchiveEntry) extends GRMsg case class RestoreNotRequested(entry:ArchiveEntry) extends GRMsg case class RestoreInProgress(entry:ArchiveEntry) extends GRMsg case class RestoreExpired(entry:ArchiveEntry) extends GRMsg case class RestoreCompleted(entry:ArchiveEntry, expiresAt:ZonedDateTime) extends GRMsg case class ItemLost(entry:ArchiveEntry) extends GRMsg } @Singleton class GlacierRestoreActor @Inject() (config:Configuration, esClientMgr:ESClientManager, s3ClientMgr:S3ClientManager, jobModelDAO: JobModelDAO, lbEntryDAO:LightboxEntryDAO, system:ActorSystem) extends Actor { import GlacierRestoreActor._ import com.theguardian.multimedia.archivehunter.common.cmn_helpers.S3ClientExtensions._ private val logger = Logger(getClass) implicit val ec:ExecutionContext = system.getDispatcher val defaultExpiry = config.getOptional[Int]("archive.restoresExpireAfter").getOrElse(3) logger.info(s"Glacier restores will expire after $defaultExpiry days") private val indexer = new Indexer(config.get[String]("externalData.indexName")) implicit val esClient = esClientMgr.getClient() def updateLightbox(lbEntry:LightboxEntry, availableUntil:Option[ZonedDateTime]=None,error:Option[Throwable]=None) = { val newStatus = error match { case Some(err)=>RestoreStatus.RS_ERROR case None=>RestoreStatus.RS_UNDERWAY } val updatedEntry = lbEntry.copy(restoreStarted = Some(ZonedDateTime.now()), restoreStatus = newStatus, lastError = error.map(_.toString), ) lbEntryDAO.put(updatedEntry) } def updateJob(jobModel: JobModel, newStatus:JobStatus.Value, newLog:Option[String]) = { val updatedModel = jobModel.copy(jobStatus = newStatus, log=newLog, completedAt=Some(ZonedDateTime.now())) jobModelDAO.putJob(updatedModel) } def updateLightboxFull(lbEntry:LightboxEntry, newStatus:RestoreStatus.Value, expiryTime:Option[ZonedDateTime]) = { val updatedEntry = newStatus match { case RestoreStatus.RS_SUCCESS=> lbEntry.copy(restoreStatus = newStatus, restoreCompleted = Some(ZonedDateTime.now()), availableUntil = expiryTime) case RestoreStatus.RS_ERROR=> lbEntry.copy(restoreStatus = newStatus, restoreCompleted = Some(ZonedDateTime.now())) case _=> lbEntry.copy(restoreStatus = newStatus) } lbEntryDAO.put(updatedEntry) } def updateLightboxExpired(maybeLBEntry:Option[LightboxEntry], bucket:String, path:String) = maybeLBEntry match { case Some(lb)=> updateLightboxFull(lb, RestoreStatus.RS_EXPIRED, None).map(Some.apply) case None=> Future(None) } private def checkStatus(result:HeadObjectResponse, entry: ArchiveEntry, lbEntry: Option[LightboxEntry], jobs:Option[List[JobModel]], originalSender:ActorRef) = { logger.info(s"Got metadata for s3://${entry.bucket}/${entry.path} @${entry.maybeVersion.getOrElse("LATEST")}. Archive status is ${result.archiveStatusAsString()}") val maybeRestoreStatus = Option(result.restore()).map(S3RestoreHeader.apply) logger.info(s"s3://${entry.bucket}/${entry.path} @${entry.maybeVersion.getOrElse("LATEST")} restore status is ${maybeRestoreStatus}") maybeRestoreStatus match { case None=> logger.info(s"s3://${entry.bucket}/${entry.path} @${entry.maybeVersion.getOrElse("LATEST")} - there is no restore header in the head response") updateLightboxExpired(lbEntry, entry.bucket, entry.path) //will only update if there _is_ a lightbox entry to update originalSender ! RestoreNotRequested(entry) case Some(Failure(err)) => logger.info(s"s3://${entry.bucket}/${entry.path} could not check restore status: ${err.getMessage}") if(result.storageClass()!=StorageClass.GLACIER){ logger.info(s"s3://${entry.bucket}/${entry.path} storage class is ${result.storageClassAsString()}, assuming not in archive") originalSender ! NotInArchive(entry) } else { logger.warn(s"s3://${entry.bucket}/${entry.path} there is no restore record in S3") if(jobs.isDefined) jobs.get.foreach(job=>updateJob(job, JobStatus.ST_ERROR,Some("No restore record in S3"))) if(lbEntry.isDefined) updateLightbox(lbEntry.get,None,error=Some(new RuntimeException("No restore record in S3"))) originalSender ! RestoreNotRequested(entry) } case Some(Success(S3RestoreHeader(true, _))) => //restore is in progress logger.info(s"s3://${entry.bucket}/${entry.path} is currently under restore: $maybeRestoreStatus") if(jobs.isDefined) jobs.get.foreach(job=>updateJob(job, JobStatus.ST_RUNNING, None)) if(lbEntry.isDefined) updateLightboxFull(lbEntry.get,RestoreStatus.RS_UNDERWAY,None) originalSender ! RestoreInProgress(entry) case Some(Success(S3RestoreHeader(false, None))) => //restore not in progress, but no expiry time - expiry time is probably passed logger.info(s"s3://${entry.bucket}/${entry.path} is not currently under restore and probably expired: $maybeRestoreStatus") updateLightboxExpired(lbEntry, entry.bucket, entry.path) originalSender ! RestoreNotRequested(entry) case Some(Success(S3RestoreHeader(false, Some(expiry)))) => //restore not in progress, it's available unless the expiry time is already passed logger.info(s"s3://${entry.bucket}/${entry.path} has completed restore: ${maybeRestoreStatus}") if(jobs.isDefined) jobs.get.foreach(job=>updateJob(job,JobStatus.ST_SUCCESS,None)) if(lbEntry.isDefined) updateLightboxFull(lbEntry.get, RestoreStatus.RS_SUCCESS, Some(expiry)) originalSender ! RestoreCompleted(entry, expiry) } } private def initiateRestore(bucket:String, key:String, maybeVersion:Option[String], maybeExpiry:Option[Int])(implicit client:S3Client) = { val initialRequest = RestoreObjectRequest.builder().bucket(bucket).key(key) val withExp = maybeExpiry match { case Some(expiry)=> val rs = RestoreRequest.builder().days(expiry).build() initialRequest.restoreRequest(rs) case None=> val rs = RestoreRequest.builder().days(defaultExpiry).build() initialRequest.restoreRequest(rs) } val withVersion = maybeVersion match { case Some(ver)=> withExp.versionId(ver) case None=> withExp } Try { client.restoreObject(withVersion.build()) } } override def receive: Receive = { case InternalCheckRestoreStatus(lbEntry, entry, jobs, originalSender)=> logger.info(s"Checking restore status for s3://${entry.bucket}/${entry.path}") logger.info(s"From lightbox entry $lbEntry") logger.info(s"With jobs $jobs") implicit val s3client = s3ClientMgr.getS3Client(config.getOptional[String]("externalData.awsProfile"), entry.region.map(Region.of)) s3client.getObjectMetadata(entry.bucket, entry.path, entry.maybeVersion) match { case Success(result)=> checkStatus(result, entry, lbEntry, jobs, originalSender) case Failure(s3err:com.amazonaws.services.s3.model.AmazonS3Exception)=> if(s3err.getStatusCode==404) { logger.warn(s"Registered item s3://${entry.bucket}/${entry.path} does not exist any more!") if(entry.beenDeleted) { originalSender ! ItemLost(entry) } else { val updatedEntry = entry.copy(beenDeleted = true) indexer.indexSingleItem(updatedEntry, Some(updatedEntry.id)).onComplete({ case Success(Right(_))=> logger.info(s"Item for s3://${entry.bucket}/${entry.path} marked as deleted") case Success(Left(indexerErr))=> logger.error(s"Could not mark s3://${entry.bucket}/${entry.path} as deleted: $indexerErr") case Failure(err)=> logger.error(s"Could not mark item for s3://${entry.bucket}/${entry.path} as deleted: ${err.getMessage}", err) }) originalSender ! ItemLost(updatedEntry) } } else { logger.warn(s"Could not check restore status due to an s3 error s3://${entry.bucket}/${entry.path}: ${s3err.getMessage}", s3err) originalSender ! RestoreFailure(s3err) } case Failure(err)=> logger.warn(s"Could not check restore status for s3://${entry.bucket}/${entry.path}: ${err.getMessage}", err) originalSender ! RestoreFailure(err) } case CheckRestoreStatusBasic(archiveEntry)=> self ! InternalCheckRestoreStatus(None, archiveEntry, None, sender()) case CheckRestoreStatus(lbEntry)=> val originalSender = sender() indexer.getById(lbEntry.fileId).map(entry=>{ jobModelDAO.jobsForSource(entry.id).map(jobResults => { val failures = jobResults.collect({ case Left(err) => err }) if (failures.nonEmpty) { logger.error("Could not retrieve jobs records: ") failures.foreach(err => logger.error(err.toString)) originalSender ! RestoreFailure(new RuntimeException("could not retrieve job records")) } else { val jobs = jobResults.collect({ case Right(x) => x }) .filter(_.jobType == "RESTORE") .filter(_.jobStatus != JobStatus.ST_ERROR) .filter(_.jobStatus != JobStatus.ST_SUCCESS) self ! InternalCheckRestoreStatus(Some(lbEntry), entry, Some(jobs), originalSender) } }) }) case InitiateRestoreBasic(entry, maybeExpiry)=> implicit val s3client = s3ClientMgr.getS3Client(config.getOptional[String]("externalData.awsProfile"), entry.region.map(Region.of)) val inst = Instant.now().plus(maybeExpiry.getOrElse(defaultExpiry).toLong, ChronoUnit.DAYS) val willExpire = ZonedDateTime.ofInstant(inst,ZoneId.systemDefault()) val newJob = JobModel.newJob("RESTORE",entry.id,SourceType.SRC_MEDIA) val originalSender = sender() //completion is detected by the inputLambda, and the job status will be updated there. jobModelDAO.putJob(newJob).onComplete({ case Success(_)=> logger.debug(s"${entry.location}: initiating restore") initiateRestore(entry.bucket, entry.path, entry.maybeVersion, maybeExpiry)match { case Success(_)=>originalSender ! RestoreInProgress case Failure(err)=> logger.error("S3 restore request failed: ", err) originalSender ! RestoreFailure(err) } case Failure(err)=> logger.error(s"Could not update job info for ${newJob.jobId} on ${entry.location}: ${err.getMessage}", err) originalSender ! RestoreFailure(err) }) case InitiateRestore(entry, lbEntry, maybeExpiry)=> implicit val s3client = s3ClientMgr.getS3Client(config.getOptional[String]("externalData.awsProfile"), entry.region.map(Region.of)) val inst = Instant.now().plus(maybeExpiry.getOrElse(defaultExpiry).toLong, ChronoUnit.DAYS) val willExpire = ZonedDateTime.ofInstant(inst,ZoneId.systemDefault()) val newJob = JobModel.newJob("RESTORE",entry.id,SourceType.SRC_MEDIA) val originalSender = sender() val restoreResult = for { _ <- jobModelDAO.putJob(newJob) result <- Future.fromTry{ logger.debug(s"${entry.location}: initiating restore") initiateRestore(entry.bucket, entry.path, entry.maybeVersion, maybeExpiry) } } yield result restoreResult.onComplete({ case Success(restoreObjectResult)=> logger.info(s"Restore started for ${entry.location}, requestor charged? ${restoreObjectResult.requestChargedAsString()}, output path ${restoreObjectResult.restoreOutputPath()}") updateLightbox(lbEntry, availableUntil=Some(willExpire)).andThen({ case _ => originalSender ! RestoreSuccess }) case Failure(ex:akka.stream.BufferOverflowException)=> logger.debug(ex.toString) logger.warn(s"Caught buffer overflow exception, retrying operation in 5s") system.scheduler.scheduleOnce(5.seconds,self,InitiateRestore(entry,lbEntry,maybeExpiry)) case Failure(ex)=> logger.error(s"Could not restore ${entry.location}", ex) updateLightbox(lbEntry, error=Some(ex)).andThen({ case _=>originalSender ! RestoreFailure(ex) }) }) } }