app/helpers/DigestSink.scala (44 lines of code) (raw):

package helpers import akka.stream.scaladsl.{GraphDSL, Sink} import akka.stream.{Attributes, Inlet, SinkShape} import akka.stream.stage.{AbstractInHandler, GraphStageLogic, GraphStageWithMaterializedValue} import akka.util.ByteString import java.security.MessageDigest import scala.concurrent.{Future, Promise} import scala.util.{Failure, Success} object DigestSink { def apply(algorithm:String) = { val factory = new DigestSink(algorithm) Sink.fromGraph(GraphDSL.createGraph(factory) { implicit builder=> s=> SinkShape.of(s.in) }) } } /** * Simple sink to perform a checksum on all the incoming data and return it at the stream end. * @param algorithm algorithm to checksum. Can by anything supported by MessageDigest.getInstance. */ class DigestSink(algorithm:String) extends GraphStageWithMaterializedValue[SinkShape[ByteString], Future[ByteString]]{ private final val in:Inlet[ByteString] = Inlet.create("DigestSink.in") override def shape: SinkShape[ByteString] = SinkShape.of(in) override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[ByteString]) = { val completionPromise:Promise[ByteString] = Promise() val digester = MessageDigest.getInstance(algorithm) val logic = new GraphStageLogic(shape) { setHandler(in, new AbstractInHandler { override def onPush(): Unit = { val nextBlock = grab(in) digester.update(nextBlock.asByteBuffer) pull(in) } override def onUpstreamFinish(): Unit = { val result = ByteString(digester.digest()) completionPromise.complete(Success(result)) } override def onUpstreamFailure(ex: Throwable): Unit = { completionPromise.complete(Failure(ex)) } }) override def preStart(): Unit = { pull(in) } } (logic, completionPromise.future) } }