override def createLogic()

in stream/src/main/scala/org/apache/pekko/stream/scaladsl/Framing.scala [222:373]


    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
      new GraphStageLogic(shape) with InHandler with OutHandler {
        private val firstSeparatorByte = separatorBytes.head
        private var buffer = ByteString.empty
        private var nextPossibleMatch = 0

        // We use an efficient unsafe array implementation and must be use with caution.
        // It contains all indices computed during search phase.
        // The capacity is fixed at 256 to preserve fairness and prevent uneccessary allocation during parsing phase.
        // This array provide a way to check remaining capacity and must be use to prevent out of bounds exception.
        // In this use case, we compute all possibles indices up to 256 and then parse everything.
        private val indices = new LightArray[(Int, Int)](256)

        override def onPush(): Unit = {
          buffer ++= grab(in)
          searchIndices()
        }

        override def onPull(): Unit = searchIndices()

        override def onUpstreamFinish(): Unit = {
          if (buffer.isEmpty) {
            completeStage()
          } else if (isAvailable(out)) {
            searchIndices()
          } // else swallow the termination and wait for pull
        }

        private def tryPull(): Unit = {
          if (isClosed(in)) {
            if (allowTruncation) {
              push(out, buffer)
              completeStage()
            } else
              failStage(new FramingException("Stream finished but there was a truncated final frame in the buffer"))
          } else pull(in)
        }

        @tailrec
        private def searchIndices(): Unit = {
          // Next possible position for the delimiter
          val possibleMatchPos = buffer.indexOf(firstSeparatorByte, from = nextPossibleMatch)

          // Retrive previous position
          val previous = indices.lastOption match {
            case OptionVal.Some((_, i)) => i + separatorBytes.size
            case _                      => 0
          }

          if (possibleMatchPos - previous > maximumLineBytes) {
            failStage(
              new FramingException(
                s"Read ${possibleMatchPos - previous} bytes " +
                s"which is more than $maximumLineBytes without seeing a line terminator"))
          } else if (possibleMatchPos == -1) {
            if (buffer.length - previous > maximumLineBytes)
              failStage(
                new FramingException(
                  s"Read ${buffer.length - previous} bytes " +
                  s"which is more than $maximumLineBytes without seeing a line terminator"))
            else {
              // No matching character, we need to accumulate more bytes into the buffer
              nextPossibleMatch = buffer.length
              doParse()
            }
          } else if (possibleMatchPos + separatorBytes.length > buffer.length) {
            // We have found a possible match (we found the first character of the terminator
            // sequence) but we don't have yet enough bytes. We remember the position to
            // retry from next time.
            nextPossibleMatch = possibleMatchPos
            doParse()
          } else if (buffer.slice(possibleMatchPos, possibleMatchPos + separatorBytes.length) == separatorBytes) {
            // Found a match, mark start and end position and iterate if possible
            indices += (previous -> possibleMatchPos)
            nextPossibleMatch = possibleMatchPos + separatorBytes.length
            if (nextPossibleMatch == buffer.length || indices.isFull) {
              doParse()
            } else {
              searchIndices()
            }
          } else {
            // possibleMatchPos was not actually a match
            nextPossibleMatch += 1
            searchIndices()
          }
        }

        private def doParse(): Unit =
          if (indices.isEmpty) tryPull()
          else if (indices.length == 1) {
            // Emit result and compact buffer
            val indice = indices(0)
            push(out, buffer.slice(indice._1, indice._2).compact)
            reset()
            if (isClosed(in) && buffer.isEmpty) completeStage()
          } else {
            // Emit results and compact buffer
            emitMultiple(out, new FrameIterator(),
              () => {
                reset()
                if (isClosed(in) && buffer.isEmpty) completeStage()
              })
          }

        private def reset(): Unit = {
          val previous = indices.lastOption match {
            case OptionVal.Some((_, i)) => i + separatorBytes.size
            case _                      => 0
          }

          buffer = buffer.drop(previous).compact
          indices.setLength(0)
          nextPossibleMatch = 0
        }

        // Iterator able to iterate over precompute frame based on start and end position
        private class FrameIterator(private var index: Int = 0) extends Iterator[ByteString] {
          def hasNext: Boolean = index != indices.length

          def next(): ByteString = {
            val indice = indices(index)
            index += 1
            buffer.slice(indice._1, indice._2).compact
          }
        }

        // Basic array implementation that allow unsafe resize.
        private class LightArray[T: ClassTag](private val capacity: Int, private var index: Int = 0) {

          private val underlying = Array.ofDim[T](capacity)

          def apply(i: Int) = underlying(i)

          def +=(el: T): Unit = {
            underlying(index) = el
            index += 1
          }

          def isEmpty: Boolean = length == 0

          def isFull: Boolean = capacity == length

          def setLength(length: Int): Unit = index = length

          def length: Int = index

          def lastOption: OptionVal[T] =
            if (index > 0) OptionVal.Some(underlying(index - 1))
            else OptionVal.none
        }
        setHandlers(in, out, this)
      }