app/helpers/ContentHashingFlow.scala (33 lines of code) (raw):
package helpers
import java.security.MessageDigest
import akka.stream._
import akka.stream.stage.{AbstractInHandler, AbstractOutHandler, GraphStage, GraphStageLogic}
import akka.util.ByteString
class ContentHashingFlow(algo:String) extends GraphStage[FlowShape[ByteString, ByteString]]{
final val in:Inlet[ByteString] = Inlet.create("ContentHashingFlow.in")
final val out:Outlet[ByteString] = Outlet.create("ContentHashingFlow.out")
override def shape: FlowShape[ByteString, ByteString] = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
val digester = MessageDigest.getInstance(algo)
setHandler(in, new AbstractInHandler {
override def onPush(): Unit = {
val elem = grab(in)
digester.update(elem.toArray)
pull(in)
}
override def onUpstreamFinish(): Unit = {
val result = digester.digest()
push(out, ByteString(result))
}
})
setHandler(out, new AbstractOutHandler {
override def onPull(): Unit = {
if(!isClosed(in)) {
pull(in)
} else {
completeStage()
}
}
})
}
}