app/streamcomponents/AuditLogFinish.scala (42 lines of code) (raw):

package streamcomponents import akka.actor.ActorRef import akka.stream.{Attributes, FlowShape, Inlet, Outlet} import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler} import akka.util.ByteString import models.{AuditEvent, AuditFile} import org.slf4j.LoggerFactory class AuditLogFinish (auditLogActor:ActorRef, auditFile:AuditFile, uid:String, expectedBytes:Long) extends GraphStage[FlowShape[ByteString,ByteString]] { final val in:Inlet[ByteString] = Inlet.create("AuditLogFinish.in") final val out:Outlet[ByteString] = Outlet.create("AuditLogFinish.out") override def shape: FlowShape[ByteString, ByteString] = FlowShape.of(in,out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler with InHandler { private val logger = LoggerFactory.getLogger(getClass) private var bytesCounter:Long = 0 override def onPush(): Unit = { val elem=grab(in) bytesCounter+=elem.length push(out, elem) } override def onPull(): Unit = pull(in) override def preStart(): Unit = { logger.info(s"Streaming started for $auditFile") auditLogActor ! actors.Audit.LogEvent(AuditEvent.STREAMOUT_START, uid, Some(auditFile), Seq(), None, None) } override def postStop(): Unit = { logger.info(s"Streaming finished for $auditFile, passed $bytesCounter bytes") if(bytesCounter>=expectedBytes) { //bytesCounter could be more than expectedBytes if we are doing a multipart MIME response auditLogActor ! actors.Audit.LogEvent(AuditEvent.STREAMOUT_END, uid, Some(auditFile), Seq(), None, Some(bytesCounter)) } else { val maybePct = if(expectedBytes>0) Some((bytesCounter.toDouble / expectedBytes.toDouble)*100) else None val maybePctString = maybePct.map(pct=>f"$pct%3.0f") auditLogActor ! actors.Audit.LogEvent(AuditEvent.STREAMOUT_SHORT, uid, Some(auditFile), Seq(), Some(s"Expected $expectedBytes bytes, streamed ${maybePctString.getOrElse("(unknown)")}% of file"), Some(bytesCounter)) } } override def onUpstreamFailure(ex: Throwable): Unit = { logger.error(s"Streaming failed for $auditFile on $bytesCounter bytes: ", ex) auditLogActor ! actors.Audit.LogEvent(AuditEvent.OMERROR,uid,Some(auditFile),Seq(),None,Some(bytesCounter)) } override def onDownstreamFinish(): Unit = super.onDownstreamFinish() setHandlers(in,out, this) } }