override def createLogic()

in http-core/src/main/scala/org/apache/pekko/http/impl/engine/parsing/BodyPartParser.scala [65:286]


  override def createLogic(attributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler with OutHandler {
      private var output = collection.immutable.Queue.empty[Output] // FIXME this probably is too wasteful
      private var state: ByteString => StateResult = tryParseInitialBoundary
      private var shouldTerminate = false
      // Will be override at the beginning of the parsing (tryParseInitialBoundary and parsePreamble)
      // But initially defined here as norm version to avoid NPE
      private var eolConfiguration: EndOfLineConfiguration = UndefinedEndOfLineConfiguration(boundary)

      override def onPush(): Unit = {
        if (!shouldTerminate) {
          val elem = grab(in)
          try run(elem)
          catch {
            case e: ParsingException    => fail(e.info)
            case NotEnoughDataException =>
              // we are missing a try/catch{continue} wrapper somewhere
              throw new IllegalStateException("unexpected NotEnoughDataException", NotEnoughDataException)
          }
          if (output.nonEmpty) push(out, dequeue())
          else if (!shouldTerminate) pull(in)
          else completeStage()
        } else completeStage()
      }

      override def onPull(): Unit = {
        if (output.nonEmpty) push(out, dequeue())
        else if (isClosed(in)) {
          if (!shouldTerminate) push(out, ParseError(ErrorInfo("Unexpected end of multipart entity")))
          completeStage()
        } else pull(in)
      }

      override def onUpstreamFinish(): Unit = {
        if (isAvailable(out)) onPull()
      }

      // hacky trampolining support: parsing states may call `trampoline(continuation)` to loop safely
      def run(data: ByteString): Unit = {
        @tailrec def loop(): Unit =
          trampoline match {
            case null =>
            case f =>
              trampoline = null
              f()
              loop()
          }

        state(data)
        loop()
      }
      private var trampoline: () => StateResult = null
      def trampoline(continue: => StateResult): StateResult = {
        require(trampoline eq null)
        trampoline = () => continue
        done()
      }

      setHandlers(in, out, this)

      def tryParseInitialBoundary(input: ByteString): StateResult =
        // we don't use boyerMoore here because we are testing for the boundary *without* a
        // preceding LF or CRLF and at a known location (the very beginning of the entity)
        try {
          eolConfiguration = eolConfiguration.defineOnce(input)
          if (eolConfiguration.isBoundary(input, 0)) {
            val ix = eolConfiguration.boundaryLength
            if (eolConfiguration.isEndOfLine(input, ix)) parseHeaderLines(input, ix + eolConfiguration.eolLength)
            else if (doubleDash(input, ix)) setShouldTerminate()
            else parsePreamble(input)
          } else parsePreamble(input)
        } catch {
          case NotEnoughDataException => continue(input, 0)((newInput, _) => tryParseInitialBoundary(newInput))
        }

      def parsePreamble(input: ByteString): StateResult =
        try {
          @tailrec def rec(index: Int): StateResult = {
            val needleEnd = eolConfiguration.boyerMoore.nextIndex(input, index) + eolConfiguration.needle.length
            if (eolConfiguration.isEndOfLine(input, needleEnd))
              parseHeaderLines(input, needleEnd + eolConfiguration.eolLength)
            else if (doubleDash(input, needleEnd)) setShouldTerminate()
            else rec(needleEnd)
          }
          eolConfiguration = eolConfiguration.defineOnce(input)
          rec(0)
        } catch {
          case NotEnoughDataException => continue(input, 0)((newInput, _) => parsePreamble(newInput))
        }

      @tailrec def parseHeaderLines(input: ByteString, lineStart: Int,
          headers: ListBuffer[HttpHeader] = ListBuffer[HttpHeader](),
          headerCount: Int = 0, cth: Option[`Content-Type`] = None): StateResult = {
        def contentType =
          cth match {
            case Some(x) => x.contentType
            case None    => defaultContentType
          }

        var lineEnd = 0
        val resultHeader =
          try {
            if (!eolConfiguration.isBoundary(input, lineStart)) {
              lineEnd = headerParser.parseHeaderLine(input, lineStart)()
              headerParser.resultHeader
            } else BoundaryHeader
          } catch {
            case NotEnoughDataException => null
          }
        resultHeader match {
          case null => continue(input, lineStart)(parseHeaderLinesAux(headers, headerCount, cth))

          case BoundaryHeader =>
            emit(BodyPartStart(headers.toList, _ => HttpEntity.empty(contentType)))
            val ix = lineStart + eolConfiguration.boundaryLength
            if (eolConfiguration.isEndOfLine(input, ix))
              parseHeaderLines(input, ix + eolConfiguration.eolLength, headers, headerCount, None)
            else if (doubleDash(input, ix)) setShouldTerminate()
            else fail("Illegal multipart boundary in message content")

          case EmptyHeader => parseEntity(headers.toList, contentType)(input, lineEnd)

          case h: `Content-Type` =>
            if (cth.isEmpty) parseHeaderLines(input, lineEnd, headers, headerCount + 1, Some(h))
            else if (cth.get == h) parseHeaderLines(input, lineEnd, headers, headerCount, cth)
            else fail("multipart part must not contain more than one Content-Type header")

          case h if headerCount < maxHeaderCount =>
            parseHeaderLines(input, lineEnd, headers += h, headerCount + 1, cth)

          case _ => fail(s"multipart part contains more than the configured limit of $maxHeaderCount headers")
        }
      }

      // work-around for compiler complaining about non-tail-recursion if we inline this method
      def parseHeaderLinesAux(headers: ListBuffer[HttpHeader], headerCount: Int,
          cth: Option[`Content-Type`])(input: ByteString, lineStart: Int): StateResult =
        parseHeaderLines(input, lineStart, headers, headerCount, cth)

      def parseEntity(headers: List[HttpHeader], contentType: ContentType,
          emitPartChunk: (List[HttpHeader], ContentType, ByteString) => Unit = {
            (headers, ct, bytes) =>
              emit(BodyPartStart(headers,
                entityParts =>
                  HttpEntity.IndefiniteLength(
                    ct,
                    entityParts.collect { case EntityPart(data) => data })))
              emit(bytes)
          },
          emitFinalPartChunk: (List[HttpHeader], ContentType, ByteString) => Unit = {
            (headers, ct, bytes) =>
              emit(BodyPartStart(headers,
                { rest =>
                  StreamUtils.cancelSource(rest)(materializer)
                  HttpEntity.Strict(ct, bytes)
                }))
          })(input: ByteString, offset: Int): StateResult =
        try {
          @tailrec def rec(index: Int): StateResult = {
            val currentPartEnd = eolConfiguration.boyerMoore.nextIndex(input, index)
            def emitFinalChunk() = emitFinalPartChunk(headers, contentType, input.slice(offset, currentPartEnd))
            val needleEnd = currentPartEnd + eolConfiguration.needle.length
            if (eolConfiguration.isEndOfLine(input, needleEnd)) {
              emitFinalChunk()
              // Need to trampoline here, otherwise we have a mutual tail recursion between parseHeaderLines and
              // parseEntity that is not tail-call optimized away and may lead to stack overflows on big chunks of data
              // containing many parts.
              trampoline(parseHeaderLines(input, needleEnd + eolConfiguration.eolLength))
            } else if (doubleDash(input, needleEnd)) {
              emitFinalChunk()
              setShouldTerminate()
            } else rec(needleEnd)
          }
          rec(offset)
        } catch {
          case NotEnoughDataException =>
            // we cannot emit all input bytes since the end of the input might be the start of the next boundary
            val emitEnd = input.length - eolConfiguration.needle.length - eolConfiguration.eolLength
            if (emitEnd > offset) {
              emitPartChunk(headers, contentType, input.slice(offset, emitEnd))
              val simpleEmit: (List[HttpHeader], ContentType, ByteString) => Unit = (_, _, bytes) => emit(bytes)
              continue(input.drop(emitEnd), 0)(parseEntity(null, null, simpleEmit, simpleEmit))
            } else continue(input, offset)(parseEntity(headers, contentType, emitPartChunk, emitFinalPartChunk))
        }

      def emit(bytes: ByteString): Unit = if (bytes.nonEmpty) emit(EntityPart(bytes))

      def emit(element: Output): Unit = output = output.enqueue(element)

      def dequeue(): Output = {
        val head = output.head
        output = output.tail
        head
      }

      def continue(input: ByteString, offset: Int)(next: (ByteString, Int) => StateResult): StateResult = {
        state =
          math.signum(offset - input.length) match {
            case -1 => more => next(input ++ more, offset)
            case 0  => next(_, 0)
            case 1  => throw new IllegalStateException
          }
        done()
      }

      def fail(summary: String): StateResult = fail(ErrorInfo(summary))

      def fail(info: ErrorInfo): StateResult = {
        emit(ParseError(info))
        setShouldTerminate()
      }

      def setShouldTerminate(): StateResult = {
        shouldTerminate = true
        done()
      }

      def done(): StateResult = null // StateResult is a phantom type

      def doubleDash(input: ByteString, offset: Int): Boolean =
        byteChar(input, offset) == '-' && byteChar(input, offset + 1) == '-'
    }