override def createLogic()

in http/src/main/scala/org/apache/pekko/http/scaladsl/unmarshalling/sse/LineParser.scala [39:92]


  override def createLogic(attributes: Attributes) =
    new GraphStageLogic(shape) with InHandler with OutHandler {
      import LineParser._
      import shape._

      private var buffer = ByteString.empty
      private var lastCharWasCr = false

      setHandlers(in, out, this)

      override def onPush() = {
        @tailrec
        def parseLines(
            bs: ByteString,
            from: Int = 0,
            at: Int,
            parsedLines: Vector[String] = Vector.empty,
            lastCharWasCr: Boolean = false): (ByteString, Vector[String], Boolean) =
          if (at >= bs.length)
            (bs.drop(from), parsedLines, lastCharWasCr)
          else
            bs(at) match {
              case CR if at < bs.length - 1 && bs(at + 1) == LF =>
                // Lookahead for LF after CR
                parseLines(bs, at + 2, at + 2, parsedLines :+ bs.slice(from, at).utf8String)
              case CR =>
                // if is a CR but we don't know the next character, slice it but flag that the last character was a CR so if the next happens to be a LF we just ignore
                parseLines(bs, at + 1, at + 1, parsedLines :+ bs.slice(from, at).utf8String, lastCharWasCr = true)
              case LF if lastCharWasCr =>
                // if is a LF and we just sliced a CR then we simply advance
                parseLines(bs, at + 1, at + 1, parsedLines)
              case LF =>
                // a LF that wasn't preceded by a CR means we found a new slice
                parseLines(bs, at + 1, at + 1, parsedLines :+ bs.slice(from, at).utf8String)
              case _ =>
                // for other input, simply advance
                parseLines(bs, from, at + 1, parsedLines)
            }

        // start the search where it ended, prevent iterating over all the buffer again
        val currentBufferStart = math.max(0, buffer.length - 1)
        buffer = parseLines(buffer ++ grab(in), at = currentBufferStart, lastCharWasCr = lastCharWasCr) match {
          case (remaining, _, _) if remaining.size > maxLineSize =>
            failStage(new IllegalStateException(s"maxLineSize of $maxLineSize exceeded!"))
            ByteString.empty // Clear buffer
          case (remaining, parsedLines, _lastCharWasCr) =>
            if (parsedLines.nonEmpty) emitMultiple(out, parsedLines) else pull(in)
            lastCharWasCr = _lastCharWasCr
            remaining
        }
      }

      override def onPull() = pull(in)
    }