def writer()

in runtime/src/main/scala/org/apache/pekko/grpc/internal/AbstractGrpcProtocol.scala [108:143]


  def writer(
      protocol: GrpcProtocol,
      codec: Codec,
      encodeFrame: Frame => ChunkStreamPart,
      encodeDataToResponse: (ByteString, immutable.Seq[HttpHeader], Trailer) => HttpResponse): GrpcProtocolWriter =
    GrpcProtocolWriter(
      adjustCompressibility(protocol.contentType, codec),
      codec,
      encodeFrame,
      encodeDataToResponse,
      Flow[Frame].map(encodeFrame))

  def reader(
      codec: Codec,
      decodeFrame: (Int, ByteString) => Frame,
      preDecodeStrict: ByteString => ByteString = null,
      preDecodeFlow: Flow[ByteString, ByteString, NotUsed] = null): GrpcProtocolReader = {
    val strictAdapter: ByteString => ByteString = if (preDecodeStrict eq null) identity else preDecodeStrict
    val adapter: Flow[ByteString, Frame, NotUsed] => Flow[ByteString, Frame, NotUsed] =
      if (preDecodeFlow eq null) identity
      else x => Flow[ByteString].via(preDecodeFlow).via(x)

    // strict decoder
    def decoder(bs: ByteString): ByteString =
      try {
        val reader = new ByteReader(strictAdapter(bs))
        val frameType = reader.readByte()
        val length = reader.readIntBE()
        val data = reader.take(length)
        if (reader.hasRemaining) throw new IllegalStateException("Unexpected data")
        if ((frameType & 0x80) == 0) codec.uncompress((frameType & 1) == 1, data)
        else throw new IllegalStateException("Cannot read unknown frame")
      } catch { case ByteStringParser.NeedMoreData => throw new MissingParameterException }

    GrpcProtocolReader(codec, decoder, adapter(Flow.fromGraph(new GrpcFramingDecoderStage(codec, decodeFrame))))
  }