in http/src/main/scala/org/apache/pekko/http/scaladsl/unmarshalling/sse/ServerSentEventParser.scala [107:141]
override def createLogic(attributes: Attributes) =
new GraphStageLogic(shape) with InHandler with OutHandler {
import ServerSentEventParser._
import shape._
private val builder = new Builder()
setHandlers(in, out, this)
override def onPush() = {
val line = grab(in)
if (line == "") { // An event is terminated with a new line
if (builder.hasData) // Events without data are ignored according to the spec
push(out, builder.build())
else
pull(in)
builder.reset()
} else if (builder.size + line.length <= maxEventSize) {
line match {
case Id => builder.setId("")
case Field(Data, data) if data.nonEmpty || emitEmptyEvents => builder.appendData(data)
case Field(EventType, t) if t.nonEmpty => builder.setType(t)
case Field(Id, id) => builder.setId(id)
case Field(Retry, s @ PosInt(r)) if r >= 0 => builder.setRetry(r, s.length)
case _ => // ignore according to spec
}
pull(in)
} else {
failStage(new IllegalStateException(s"maxEventSize of $maxEventSize exceeded!"))
builder.reset()
}
}
override def onPull() = pull(in)
}