app/helpers/EOSDetect.scala (29 lines of code) (raw):

package helpers import akka.stream.{Attributes, FlowShape, Inlet, Outlet} import akka.stream.stage.{AbstractInHandler, AbstractOutHandler, GraphStage, GraphStageLogic} import play.api.Logger import scala.concurrent.Promise import scala.util.Success /** * Simple stream flow that completes a Promise when the stream stops. * @param completionPromise * @param cbData * @tparam T * @tparam U */ class EOSDetect[T,U] (completionPromise:Promise[T], cbData:T) extends GraphStage[FlowShape[U,U]]{ private val in:Inlet[U] = Inlet.create("EOSDetect.in") private val out:Outlet[U] = Outlet.create("EOSDetect.out") override def shape: FlowShape[U, U] = FlowShape.of(in,out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { private val logger = Logger(getClass) setHandler(in, new AbstractInHandler { override def onPush(): Unit = { val elem = grab(in) push(out, elem) } }) setHandler(out, new AbstractOutHandler { override def onPull(): Unit = { pull(in) } }) override def postStop(): Unit = { completionPromise.complete(Success(cbData)) super.postStop() } } }