public Iterable deserialize()

in servicetalk-grpc-protobuf/src/main/java/io/servicetalk/grpc/protobuf/ProtoBufSerializationProvider.java [127:224]


        public Iterable<T> deserialize(Buffer toDeserialize) {
            if (toDeserialize.readableBytes() <= 0) {
                return emptyList(); // We don't have any additional data to process, so bail for now.
            }
            List<T> parsedData = null;

            for (;;) {
                toDeserialize = addToAccumulateIfAccumulating(toDeserialize);
                if (lengthOfData < 0) {
                    // If we don't have more than a full header, just bail and try again later when more data arrives.
                    if (toDeserialize.readableBytes() < LENGTH_PREFIXED_MESSAGE_HEADER_BYTES) {
                        return addToAccumulateIfRequiredAndReturn(toDeserialize, parsedData);
                    }

                    compressed = isCompressed(toDeserialize);

                    // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md specifies size as 4 bytes
                    // unsigned int However netty buffers only support up to Integer.MAX_VALUE, and even
                    // grpc-java (Google's implementation) only supports up to Integer.MAX_VALUE, so for
                    // simplicity we will just used signed int for now.
                    lengthOfData = toDeserialize.readInt();
                    if (lengthOfData < 0) {
                        throw new SerializationException("Message-Length invalid: " + lengthOfData);
                    }
                } else {
                    if (toDeserialize.readableBytes() < lengthOfData) {
                        return addToAccumulateIfRequiredAndReturn(toDeserialize, parsedData);
                    }

                    final T t;
                    try {
                        final CodedInputStream in;
                        Buffer buffer = toDeserialize;
                        int decodedLengthOfData = lengthOfData;
                        if (compressed) {
                            buffer = codec.decode(toDeserialize.readSlice(lengthOfData), DEFAULT_ALLOCATOR);
                            decodedLengthOfData = buffer.readableBytes();
                        }

                        if (buffer.nioBufferCount() == 1) {
                            ByteBuffer nioBuffer = buffer.toNioBuffer(buffer.readerIndex(), decodedLengthOfData);
                            in = CodedInputStream.newInstance(nioBuffer);
                        } else {
                            // Aggregated payload body may consist of multiple Buffers. In this case,
                            // CompositeBuffer.toNioBuffer(idx, length) may return a single ByteBuffer (when requested
                            // length < components[0].length) or create a new ByteBuffer and copy multiple components
                            // into it. Later, proto parser will copy data from this temporary ByteBuffer again.
                            // To avoid unnecessary copying, we use newCodedInputStream(buffers, lengthOfData).
                            final ByteBuffer[] buffers = buffer.toNioBuffers(buffer.readerIndex(),
                                    decodedLengthOfData);

                            in = buffers.length == 1 ?
                                    CodedInputStream.newInstance(buffers[0]) :
                                    newCodedInputStream(buffers, decodedLengthOfData);
                        }

                        t = parser.parseFrom(in);
                    } catch (InvalidProtocolBufferException e) {
                        throw new SerializationException(e);
                    }

                    if (!compressed) {
                        // The NIO buffer indexes are not connected to the Buffer indexes, so we need to update
                        // our indexes and discard any bytes if necessary.
                        toDeserialize.skipBytes(lengthOfData);
                    }

                    if (toDeserialize == accumulate) {
                        accumulate.discardSomeReadBytes();
                    }

                    // We parsed the expected data, update the state to prepare for parsing the next frame.
                    final int oldLengthOfData = lengthOfData;
                    lengthOfData = -1;
                    compressed = false;

                    // If we don't have more than a full header, just bail and try again later when more data arrives.
                    if (toDeserialize.readableBytes() < LENGTH_PREFIXED_MESSAGE_HEADER_BYTES) {
                        // Before we bail out, we need to save the accumulated data for next time.
                        if (toDeserialize != accumulate && toDeserialize.readableBytes() != 0) {
                            accumulate.addBuffer(toDeserialize, true);
                        }
                        if (parsedData == null) {
                            return singletonList(t);
                        }
                        parsedData.add(t);
                        return parsedData;
                    } else {
                        if (parsedData == null) {
                            // assume roughly uniform message sizes when estimating the initial size of the array.
                            parsedData = new ArrayList<>(1 + max(1, toDeserialize.readableBytes() /
                                    (oldLengthOfData + LENGTH_PREFIXED_MESSAGE_HEADER_BYTES)));
                        }
                        parsedData.add(t);
                    }
                }
            }
        }