override def shape: FlowShape[ByteString, ByteString] = FlowShape.of()

in app/streamcomponents/AuditLogFinish.scala [14:52]


  override def shape: FlowShape[ByteString, ByteString] = FlowShape.of(in,out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler with InHandler  {
    private val logger = LoggerFactory.getLogger(getClass)
    private var bytesCounter:Long = 0

    override def onPush(): Unit = {
      val elem=grab(in)
      bytesCounter+=elem.length
      push(out, elem)
    }

    override def onPull(): Unit = pull(in)

    override def preStart(): Unit = {
      logger.info(s"Streaming started for $auditFile")
      auditLogActor ! actors.Audit.LogEvent(AuditEvent.STREAMOUT_START, uid, Some(auditFile), Seq(), None, None)
    }

    override def postStop(): Unit = {
      logger.info(s"Streaming finished for $auditFile, passed $bytesCounter bytes")
      if(bytesCounter>=expectedBytes) { //bytesCounter could be more than expectedBytes if we are doing a multipart MIME response
        auditLogActor ! actors.Audit.LogEvent(AuditEvent.STREAMOUT_END, uid, Some(auditFile), Seq(), None, Some(bytesCounter))
      } else {
        val maybePct = if(expectedBytes>0) Some((bytesCounter.toDouble / expectedBytes.toDouble)*100) else None
        val maybePctString = maybePct.map(pct=>f"$pct%3.0f")
        auditLogActor ! actors.Audit.LogEvent(AuditEvent.STREAMOUT_SHORT, uid, Some(auditFile), Seq(), Some(s"Expected $expectedBytes bytes, streamed ${maybePctString.getOrElse("(unknown)")}% of file"), Some(bytesCounter))
      }
    }

    override def onUpstreamFailure(ex: Throwable): Unit = {
      logger.error(s"Streaming failed for $auditFile on $bytesCounter bytes: ", ex)
      auditLogActor ! actors.Audit.LogEvent(AuditEvent.OMERROR,uid,Some(auditFile),Seq(),None,Some(bytesCounter))
    }

    override def onDownstreamFinish(): Unit = super.onDownstreamFinish()

    setHandlers(in,out, this)
  }