in stream/src/main/scala/org/apache/pekko/stream/scaladsl/Framing.scala [222:373]
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
private val firstSeparatorByte = separatorBytes.head
private var buffer = ByteString.empty
private var nextPossibleMatch = 0
// We use an efficient unsafe array implementation and must be use with caution.
// It contains all indices computed during search phase.
// The capacity is fixed at 256 to preserve fairness and prevent uneccessary allocation during parsing phase.
// This array provide a way to check remaining capacity and must be use to prevent out of bounds exception.
// In this use case, we compute all possibles indices up to 256 and then parse everything.
private val indices = new LightArray[(Int, Int)](256)
override def onPush(): Unit = {
buffer ++= grab(in)
searchIndices()
}
override def onPull(): Unit = searchIndices()
override def onUpstreamFinish(): Unit = {
if (buffer.isEmpty) {
completeStage()
} else if (isAvailable(out)) {
searchIndices()
} // else swallow the termination and wait for pull
}
private def tryPull(): Unit = {
if (isClosed(in)) {
if (allowTruncation) {
push(out, buffer)
completeStage()
} else
failStage(new FramingException("Stream finished but there was a truncated final frame in the buffer"))
} else pull(in)
}
@tailrec
private def searchIndices(): Unit = {
// Next possible position for the delimiter
val possibleMatchPos = buffer.indexOf(firstSeparatorByte, from = nextPossibleMatch)
// Retrive previous position
val previous = indices.lastOption match {
case OptionVal.Some((_, i)) => i + separatorBytes.size
case _ => 0
}
if (possibleMatchPos - previous > maximumLineBytes) {
failStage(
new FramingException(
s"Read ${possibleMatchPos - previous} bytes " +
s"which is more than $maximumLineBytes without seeing a line terminator"))
} else if (possibleMatchPos == -1) {
if (buffer.length - previous > maximumLineBytes)
failStage(
new FramingException(
s"Read ${buffer.length - previous} bytes " +
s"which is more than $maximumLineBytes without seeing a line terminator"))
else {
// No matching character, we need to accumulate more bytes into the buffer
nextPossibleMatch = buffer.length
doParse()
}
} else if (possibleMatchPos + separatorBytes.length > buffer.length) {
// We have found a possible match (we found the first character of the terminator
// sequence) but we don't have yet enough bytes. We remember the position to
// retry from next time.
nextPossibleMatch = possibleMatchPos
doParse()
} else if (buffer.slice(possibleMatchPos, possibleMatchPos + separatorBytes.length) == separatorBytes) {
// Found a match, mark start and end position and iterate if possible
indices += (previous -> possibleMatchPos)
nextPossibleMatch = possibleMatchPos + separatorBytes.length
if (nextPossibleMatch == buffer.length || indices.isFull) {
doParse()
} else {
searchIndices()
}
} else {
// possibleMatchPos was not actually a match
nextPossibleMatch += 1
searchIndices()
}
}
private def doParse(): Unit =
if (indices.isEmpty) tryPull()
else if (indices.length == 1) {
// Emit result and compact buffer
val indice = indices(0)
push(out, buffer.slice(indice._1, indice._2).compact)
reset()
if (isClosed(in) && buffer.isEmpty) completeStage()
} else {
// Emit results and compact buffer
emitMultiple(out, new FrameIterator(),
() => {
reset()
if (isClosed(in) && buffer.isEmpty) completeStage()
})
}
private def reset(): Unit = {
val previous = indices.lastOption match {
case OptionVal.Some((_, i)) => i + separatorBytes.size
case _ => 0
}
buffer = buffer.drop(previous).compact
indices.setLength(0)
nextPossibleMatch = 0
}
// Iterator able to iterate over precompute frame based on start and end position
private class FrameIterator(private var index: Int = 0) extends Iterator[ByteString] {
def hasNext: Boolean = index != indices.length
def next(): ByteString = {
val indice = indices(index)
index += 1
buffer.slice(indice._1, indice._2).compact
}
}
// Basic array implementation that allow unsafe resize.
private class LightArray[T: ClassTag](private val capacity: Int, private var index: Int = 0) {
private val underlying = Array.ofDim[T](capacity)
def apply(i: Int) = underlying(i)
def +=(el: T): Unit = {
underlying(index) = el
index += 1
}
def isEmpty: Boolean = length == 0
def isFull: Boolean = capacity == length
def setLength(length: Int): Unit = index = length
def length: Int = index
def lastOption: OptionVal[T] =
if (index > 0) OptionVal.Some(underlying(index - 1))
else OptionVal.none
}
setHandlers(in, out, this)
}