app/helpers/LightboxStreamComponents/InitiateRestoreSink.scala (42 lines of code) (raw):
package helpers.LightboxStreamComponents
import akka.actor.ActorRef
import akka.stream.{Attributes, Inlet, SinkShape}
import akka.stream.stage.{AbstractInHandler, GraphStage, GraphStageLogic, GraphStageWithMaterializedValue}
import com.google.inject.Inject
import com.theguardian.multimedia.archivehunter.common.{ArchiveEntry, StorageClass}
import com.theguardian.multimedia.archivehunter.common.cmn_models.LightboxEntry
import javax.inject.Named
import play.api.Logger
import services.GlacierRestoreActor
import scala.concurrent.{Future, Promise}
/**
* This Sink takes in a stream of Tuple2[ArchiveEntry, LightboxEntry] and will ask GlacierRestoreActor to restore the item,
* if necessary. It materializes an Int of the count of items that it has actually requested a restore from.
* Normally, use DI to get this object - val sink = injector.getInstance(classOf[InitiateRestoreSink])
* @param glacierRestoreActor ActorRef to send the restore messages to
*/
class InitiateRestoreSink @Inject() (@Named("glacierRestoreActor") glacierRestoreActor: ActorRef )
extends GraphStageWithMaterializedValue[SinkShape[(ArchiveEntry, LightboxEntry)], Future[Int]]{
private final val in = Inlet[(ArchiveEntry, LightboxEntry)]("InitiateRestoreSink.in")
override def shape: SinkShape[(ArchiveEntry, LightboxEntry)] = SinkShape.of(in)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val promise = Promise[Int]()
val logic = new GraphStageLogic(shape) {
private val logger = Logger(getClass)
private var ctr:Int = 0
setHandler(in, new AbstractInHandler {
override def onPush(): Unit = {
val elem = grab(in)
val archiveEntry = elem._1
archiveEntry.storageClass match {
case StorageClass.GLACIER =>
logger.info(s"${archiveEntry.path} is marked as Glacier, probably needs restore.")
//passing None as the expiry time means "use default"
glacierRestoreActor ! GlacierRestoreActor.InitiateRestore(archiveEntry, elem._2, None)
logger.info(s"${archiveEntry.path}: restore requested")
ctr+=1
case _ =>
logger.info(s"${archiveEntry.path} does not need restore")
}
pull(in)
}
})
override def preStart(): Unit = pull(in)
override def postStop(): Unit = promise.success(ctr)
}
(logic, promise.future)
}
}