in simple-codecs/src/main/scala/org/apache/pekko/stream/connectors/recordio/impl/RecordIOFramingStage.scala [40:116]
override def initialAttributes: Attributes = name("recordIOFraming")
override def toString: String = "RecordIOFraming"
// The maximum length of the record prefix indicating its size.
private val maxRecordPrefixLength = maxRecordLength.toString.length
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
private var buffer = ByteString.empty
private def trimWhitespace(): Unit = buffer = buffer.dropWhile(isWhitespace)
private var currentRecordLength: Option[Int] = None // the byte length of the next record, if known
override def onPush(): Unit = {
buffer ++= grab(in)
doParse()
}
override def onPull(): Unit = doParse()
override def onUpstreamFinish(): Unit =
if (buffer.isEmpty) {
completeStage()
} else if (isAvailable(out)) {
doParse()
} // else swallow the termination and wait for pull
private def tryPull(): Unit =
if (isClosed(in)) {
failStage(new FramingException("Stream finished but there was a truncated final record in the buffer."))
} else pull(in)
@tailrec
private def doParse(): Unit =
currentRecordLength match {
case Some(length) if buffer.size >= length =>
val (record, buf) = buffer.splitAt(length)
buffer = buf.compact
trimWhitespace()
currentRecordLength = None
push(out, record.compact)
case Some(_) =>
tryPull()
case None =>
trimWhitespace()
buffer.indexOf(LineFeed) match {
case -1 if buffer.size > maxRecordPrefixLength =>
failStage(new FramingException(s"Record size prefix is longer than $maxRecordPrefixLength bytes."))
case -1 if isClosed(in) && buffer.isEmpty =>
completeStage()
case -1 =>
tryPull()
case lfPos =>
val (recordSizePrefix, buf) = buffer.splitAt(lfPos)
buffer = buf.drop(1).compact
Try(recordSizePrefix.utf8String.toInt) match {
case Success(length) if length > maxRecordLength =>
failStage(
new FramingException(
s"Record of size $length bytes exceeds maximum of $maxRecordLength bytes."))
case Success(length) if length < 0 =>
failStage(new FramingException(s"Record size prefix $length is negative."))
case Success(length) =>
currentRecordLength = Some(length)
doParse()
case Failure(ex) =>
failStage(ex)
}
}
}
setHandlers(in, out, this)
}