app/helpers/SearchHitToArchiveEntryFlow.scala (39 lines of code) (raw):

package helpers import akka.stream._ import akka.stream.stage._ import com.amazonaws.services.s3.AmazonS3 import com.sksamuel.elastic4s.http.search.SearchHit import com.theguardian.multimedia.archivehunter.common.{ArchiveEntry, ArchiveEntryHitReader} import play.api.Logger import scala.util.{Failure, Success} class SearchHitToArchiveEntryFlow extends GraphStage[FlowShape[SearchHit,ArchiveEntry]] with ArchiveEntryHitReader { final val in:Inlet[SearchHit] = Inlet.create("SearchHitToArchiveEntryFlow.in") final val out:Outlet[ArchiveEntry] = Outlet.create("SearchHitToArchiveEntryFlow.out") override def shape: FlowShape[SearchHit, ArchiveEntry] = { FlowShape.of(in,out) } override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { private val logger = Logger(getClass) logger.debug("initialised new instance") setHandler(in, new AbstractInHandler { override def onPush(): Unit = { val elem = grab(in) ArchiveEntryHR.read(elem) match { case Failure(err)=> logger.error("Could not convert ElasticSearch record to archive entry:", err) pull(in) case Success(entry)=> logger.debug(s"Got archive entry for s3://${entry.bucket}/${entry.path}") push(out, entry) } } }) setHandler(out, new AbstractOutHandler { override def onPull(): Unit = { logger.debug("pull from downstream") pull(in) } }) } }