in runtime/src/main/scala/org/apache/pekko/grpc/internal/AbstractGrpcProtocol.scala [146:180]
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new ParsingLogic {
startWith(ReadFrameHeader)
trait Step extends ParseStep[Frame]
object ReadFrameHeader extends Step {
override def parse(reader: ByteReader): ParseResult[Frame] = {
val frameType = reader.readByte()
// If we want to support > 2GB frames, this should be unsigned
val length = reader.readIntBE()
if (length == 0) ParseResult(Some(deframe(frameType, ByteString.empty)), ReadFrameHeader)
else ParseResult(None, ReadFrame(frameType, length), acceptUpstreamFinish = false)
}
}
sealed case class ReadFrame(frameType: Int, length: Int) extends Step {
private val compression = (frameType & 0x01) == 1
override def parse(reader: ByteReader): ParseResult[Frame] =
try ParseResult(
Some(deframe(frameType, codec.uncompress(compression, reader.take(length)))),
ReadFrameHeader)
catch {
case s: StatusException =>
failStage(s) // handle explicitly to avoid noisy log
ParseResult(None, Failed)
}
}
case object Failed extends Step {
override def parse(reader: ByteReader): ParseResult[Frame] = ParseResult(None, Failed)
}
}