override def createLogic()

in runtime/src/main/scala/org/apache/pekko/grpc/internal/AbstractGrpcProtocol.scala [146:180]


    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
      new ParsingLogic {
        startWith(ReadFrameHeader)

        trait Step extends ParseStep[Frame]

        object ReadFrameHeader extends Step {
          override def parse(reader: ByteReader): ParseResult[Frame] = {
            val frameType = reader.readByte()
            // If we want to support > 2GB frames, this should be unsigned
            val length = reader.readIntBE()

            if (length == 0) ParseResult(Some(deframe(frameType, ByteString.empty)), ReadFrameHeader)
            else ParseResult(None, ReadFrame(frameType, length), acceptUpstreamFinish = false)
          }
        }

        sealed case class ReadFrame(frameType: Int, length: Int) extends Step {
          private val compression = (frameType & 0x01) == 1

          override def parse(reader: ByteReader): ParseResult[Frame] =
            try ParseResult(
                Some(deframe(frameType, codec.uncompress(compression, reader.take(length)))),
                ReadFrameHeader)
            catch {
              case s: StatusException =>
                failStage(s) // handle explicitly to avoid noisy log
                ParseResult(None, Failed)
            }
        }

        case object Failed extends Step {
          override def parse(reader: ByteReader): ParseResult[Frame] = ParseResult(None, Failed)
        }
      }