app/helpers/LightboxStreamComponents/UpdateLightboxIndexInfoSink.scala (53 lines of code) (raw):

package helpers.LightboxStreamComponents import akka.actor.ActorSystem import akka.stream.scaladsl.Sink import akka.stream.{Attributes, Inlet, SinkShape} import akka.stream.stage.{AbstractInHandler, GraphStage, GraphStageLogic, GraphStageWithMaterializedValue} import com.sksamuel.elastic4s.http.{ElasticClient, HttpClient} import com.theguardian.multimedia.archivehunter.common.{ArchiveEntry, Indexer} import com.theguardian.multimedia.archivehunter.common.cmn_models.LightboxEntryDAO import helpers.LightboxHelper import models.UserProfile import org.slf4j.MDC import play.api.Logger import scala.concurrent.{Await, ExecutionContext, Future, Promise} import scala.concurrent.duration._ import scala.util.{Failure, Success, Try} /** * an akka Sink that adds incoming ArchiveEntry records to a bulk entry * @param bulkId ID of the bulk entry to add to * @param userProfile UserProfile object for the user doing the adding * @param lightboxEntryDAO implicitly provided Data Access Object for lightboxEntry * @param system implicitly provided ActorSystem * @param esClient implicitly provided HttpClient for Elastic Search * @param indexer implicitly provided Indexer instance */ class UpdateLightboxIndexInfoSink (bulkId:String,userProfile: UserProfile, userAvatarUrl:Option[String]) (implicit val lightboxEntryDAO:LightboxEntryDAO, system:ActorSystem, esClient:ElasticClient, indexer:Indexer) extends GraphStageWithMaterializedValue[SinkShape[ArchiveEntry], Future[Int]]{ private val in = Inlet.create[ArchiveEntry]("BulkAddSink.in") override def shape: SinkShape[ArchiveEntry] = SinkShape.of(in) private implicit val ec:ExecutionContext = system.dispatcher override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic,Future[Int]) = { val promise = Promise[Int]() val logic = new GraphStageLogic(shape) { private val logger = Logger(getClass) private var ctr:Int = 0 setHandler(in, new AbstractInHandler { override def onPush(): Unit = { val elem = grab(in) val response = Try { Await.result(LightboxHelper.updateIndexLightboxed(userProfile, userAvatarUrl, elem, Some(bulkId)), 30 seconds) } response match { case Success(_)=> logger.info("Saved lightbox entry") ctr+=1 pull(in) case Failure(err)=> MDC.put("error",err.toString) logger.error(s"Could not update lightbox info: ${err.getMessage}", err) promise.failure(err) failStage(err) } } }) override def preStart(): Unit = { pull(in) } override def postStop(): Unit = { promise.success(ctr) } } (logic, promise.future) } }