app/helpers/LightboxStreamComponents/BulkRestoreStatsSink.scala (65 lines of code) (raw):

package helpers.LightboxStreamComponents import akka.actor.ActorRef import akka.stream.{Attributes, Inlet, SinkShape} import akka.stream.stage.{AbstractInHandler, GraphStageLogic, GraphStageWithMaterializedValue} import com.theguardian.multimedia.archivehunter.common.cmn_models.LightboxEntry import javax.inject.{Inject, Named} import models.BulkRestoreStats import akka.pattern.ask import play.api.Logger import services.GlacierRestoreActor._ import scala.concurrent.{Await, Future, Promise} import scala.util.{Failure, Success, Try} import scala.concurrent.duration._ /** * an Akka streams sink that takes in a stream of LightboxEntries, checks the archive status on them and at the end * materializes an object of [[BulkRestoreStats]] showing what state they are in. * you should get hold of this using an injector, i.e. val sinkFactory = injector.getInstance(classOf[BulkRestoreStatsSink]) * * @param glacierRestoreActor actorRef pointing to GlacierRestoreActor. Get this using an injector. */ class BulkRestoreStatsSink @Inject() (@Named("glacierRestoreActor") glacierRestoreActor:ActorRef) extends GraphStageWithMaterializedValue[SinkShape[LightboxEntry], Future[BulkRestoreStats]]{ private final val in:Inlet[LightboxEntry] = Inlet("BulkRestoreStatsSink.in") implicit val actorTimeout:akka.util.Timeout = 60 seconds override def shape: SinkShape[LightboxEntry] = SinkShape.of(in) override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[BulkRestoreStats]) = { val logger = Logger(getClass) val completionPromise = Promise[BulkRestoreStats]() var currentStats:BulkRestoreStats = BulkRestoreStats.empty val logic = new GraphStageLogic(shape) { setHandler(in, new AbstractInHandler { override def onPush(): Unit = { val elem = grab(in) val result = Try { Await.result((glacierRestoreActor ? CheckRestoreStatus(elem)).mapTo[GRMsg], 60 seconds) } result match { case Failure(err)=> logger.error(s"Could not check archive status on ${elem.fileId}", err) failStage(err) case Success(ItemLost(_))=> logger.warn(s"${elem.fileId} has been lost!") currentStats = currentStats.copy(lost = currentStats.lost+1) case Success(NotInArchive(_))=> logger.info(s"${elem.fileId} is not in Glacier") currentStats = currentStats.copy(unneeded = currentStats.unneeded+1) case Success(RestoreNotRequested(_))=> logger.info(s"${elem.fileId} was not requested") currentStats = currentStats.copy(notRequested = currentStats.notRequested+1) case Success(RestoreInProgress(_))=> logger.info(s"${elem.fileId} is currently restoring") currentStats = currentStats.copy(inProgress = currentStats.inProgress+1) case Success(RestoreCompleted(_, _))=> logger.info(s"${elem.fileId} is available") currentStats = currentStats.copy(available = currentStats.available+1) case Success(RestoreFailure(err))=> logger.error(s"Could not check archive status on ${elem.fileId}", err) failStage(err) case Success(other)=> logger.error(s"Got unexpected message $other") failStage(new RuntimeException(s"Got unexpected message $other")) } pull(in) } }) override def preStart(): Unit = { pull(in) } override def postStop(): Unit = { completionPromise.complete(Success(currentStats)) } } (logic, completionPromise.future) } }