app/helpers/LightboxStreamComponents/SaveLightboxEntryFlow.scala (54 lines of code) (raw):
package helpers.LightboxStreamComponents
import akka.actor.ActorSystem
import akka.stream.scaladsl.Sink
import akka.stream._
import akka.stream.stage._
import com.sksamuel.elastic4s.http.{ElasticClient, HttpClient}
import com.theguardian.multimedia.archivehunter.common.{ArchiveEntry, Indexer}
import com.theguardian.multimedia.archivehunter.common.cmn_models.{LightboxEntry, LightboxEntryDAO}
import helpers.LightboxHelper
import models.UserProfile
import play.api.Logger
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
import scala.concurrent.duration._
import scala.util.{Failure, Success}
/**
* an akka Flow that creates LightboxEntry records for each ArchiveEntry that it receives, and outputs the saved LightboxEntry and incoming ARchiveEntry as a tuple
* @param bulkId ID of the bulk entry to add to
* @param userProfile UserProfile object for the user doing the adding
* @param lightboxEntryDAO implicitly provided Data Access Object for lightboxEntry
* @param system implicitly provided ActorSystem
* @param esClient implicitly provided HttpClient for Elastic Search
* @param indexer implicitly provided Indexer instance
*/
class SaveLightboxEntryFlow (bulkId:String,userProfile: UserProfile)
(implicit val lightboxEntryDAO:LightboxEntryDAO, system:ActorSystem, esClient:ElasticClient, indexer:Indexer)
extends GraphStageWithMaterializedValue[FlowShape[ArchiveEntry,(ArchiveEntry, LightboxEntry)], Future[Int]]{
private val in = Inlet.create[ArchiveEntry]("BulkAddSink.in")
private val out = Outlet.create[(ArchiveEntry, LightboxEntry)]("BulkAddSink.out")
override def shape: FlowShape[ArchiveEntry,(ArchiveEntry, LightboxEntry)] = FlowShape.of(in, out)
private implicit val ec:ExecutionContext = system.dispatcher
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic,Future[Int]) = {
val promise = Promise[Int]()
val logic = new GraphStageLogic(shape) {
private val logger = Logger(getClass)
private var ctr:Int = 0
setHandler(in, new AbstractInHandler {
override def onPush(): Unit = {
val elem = grab(in)
val saveFuture = LightboxHelper.saveLightboxEntry(userProfile, elem, Some(bulkId)).recover({
case err:Throwable=>Failure(err) //ensure that an outer exception is caught too
})
Await.result(saveFuture, 30 seconds) match {
case Success(entry)=>
logger.info("Saved lightbox entry")
ctr+=1
push(out, (elem, entry))
case Failure(err)=>
logger.error("Could not save lightbox entry", err)
//fail the output immediately, this should get seen by the stream's user
promise.failure(err)
failStage(err)
}
}
})
setHandler(out, new AbstractOutHandler {
override def onPull(): Unit = pull(in)
})
override def postStop(): Unit = {
//if we've already failed the promise no point generating random error messages
if(!promise.isCompleted) promise.success(ctr)
}
}
(logic, promise.future)
}
}