override def createLogic()

in http/src/main/scala/org/apache/pekko/http/scaladsl/unmarshalling/sse/ServerSentEventParser.scala [107:141]


  override def createLogic(attributes: Attributes) =
    new GraphStageLogic(shape) with InHandler with OutHandler {
      import ServerSentEventParser._
      import shape._

      private val builder = new Builder()

      setHandlers(in, out, this)

      override def onPush() = {
        val line = grab(in)
        if (line == "") { // An event is terminated with a new line
          if (builder.hasData) // Events without data are ignored according to the spec
            push(out, builder.build())
          else
            pull(in)
          builder.reset()
        } else if (builder.size + line.length <= maxEventSize) {
          line match {
            case Id                                                    => builder.setId("")
            case Field(Data, data) if data.nonEmpty || emitEmptyEvents => builder.appendData(data)
            case Field(EventType, t) if t.nonEmpty                     => builder.setType(t)
            case Field(Id, id)                                         => builder.setId(id)
            case Field(Retry, s @ PosInt(r)) if r >= 0                 => builder.setRetry(r, s.length)
            case _                                                     => // ignore according to spec
          }
          pull(in)
        } else {
          failStage(new IllegalStateException(s"maxEventSize of $maxEventSize exceeded!"))
          builder.reset()
        }
      }

      override def onPull() = pull(in)
    }