in runtime/src/main/scala/org/apache/pekko/grpc/internal/AbstractGrpcProtocol.scala [108:143]
def writer(
protocol: GrpcProtocol,
codec: Codec,
encodeFrame: Frame => ChunkStreamPart,
encodeDataToResponse: (ByteString, immutable.Seq[HttpHeader], Trailer) => HttpResponse): GrpcProtocolWriter =
GrpcProtocolWriter(
adjustCompressibility(protocol.contentType, codec),
codec,
encodeFrame,
encodeDataToResponse,
Flow[Frame].map(encodeFrame))
def reader(
codec: Codec,
decodeFrame: (Int, ByteString) => Frame,
preDecodeStrict: ByteString => ByteString = null,
preDecodeFlow: Flow[ByteString, ByteString, NotUsed] = null): GrpcProtocolReader = {
val strictAdapter: ByteString => ByteString = if (preDecodeStrict eq null) identity else preDecodeStrict
val adapter: Flow[ByteString, Frame, NotUsed] => Flow[ByteString, Frame, NotUsed] =
if (preDecodeFlow eq null) identity
else x => Flow[ByteString].via(preDecodeFlow).via(x)
// strict decoder
def decoder(bs: ByteString): ByteString =
try {
val reader = new ByteReader(strictAdapter(bs))
val frameType = reader.readByte()
val length = reader.readIntBE()
val data = reader.take(length)
if (reader.hasRemaining) throw new IllegalStateException("Unexpected data")
if ((frameType & 0x80) == 0) codec.uncompress((frameType & 1) == 1, data)
else throw new IllegalStateException("Cannot read unknown frame")
} catch { case ByteStringParser.NeedMoreData => throw new MissingParameterException }
GrpcProtocolReader(codec, decoder, adapter(Flow.fromGraph(new GrpcFramingDecoderStage(codec, decodeFrame))))
}