in servicetalk-grpc-protobuf/src/main/java/io/servicetalk/grpc/protobuf/ProtoBufSerializationProvider.java [127:224]
public Iterable<T> deserialize(Buffer toDeserialize) {
if (toDeserialize.readableBytes() <= 0) {
return emptyList(); // We don't have any additional data to process, so bail for now.
}
List<T> parsedData = null;
for (;;) {
toDeserialize = addToAccumulateIfAccumulating(toDeserialize);
if (lengthOfData < 0) {
// If we don't have more than a full header, just bail and try again later when more data arrives.
if (toDeserialize.readableBytes() < LENGTH_PREFIXED_MESSAGE_HEADER_BYTES) {
return addToAccumulateIfRequiredAndReturn(toDeserialize, parsedData);
}
compressed = isCompressed(toDeserialize);
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md specifies size as 4 bytes
// unsigned int However netty buffers only support up to Integer.MAX_VALUE, and even
// grpc-java (Google's implementation) only supports up to Integer.MAX_VALUE, so for
// simplicity we will just used signed int for now.
lengthOfData = toDeserialize.readInt();
if (lengthOfData < 0) {
throw new SerializationException("Message-Length invalid: " + lengthOfData);
}
} else {
if (toDeserialize.readableBytes() < lengthOfData) {
return addToAccumulateIfRequiredAndReturn(toDeserialize, parsedData);
}
final T t;
try {
final CodedInputStream in;
Buffer buffer = toDeserialize;
int decodedLengthOfData = lengthOfData;
if (compressed) {
buffer = codec.decode(toDeserialize.readSlice(lengthOfData), DEFAULT_ALLOCATOR);
decodedLengthOfData = buffer.readableBytes();
}
if (buffer.nioBufferCount() == 1) {
ByteBuffer nioBuffer = buffer.toNioBuffer(buffer.readerIndex(), decodedLengthOfData);
in = CodedInputStream.newInstance(nioBuffer);
} else {
// Aggregated payload body may consist of multiple Buffers. In this case,
// CompositeBuffer.toNioBuffer(idx, length) may return a single ByteBuffer (when requested
// length < components[0].length) or create a new ByteBuffer and copy multiple components
// into it. Later, proto parser will copy data from this temporary ByteBuffer again.
// To avoid unnecessary copying, we use newCodedInputStream(buffers, lengthOfData).
final ByteBuffer[] buffers = buffer.toNioBuffers(buffer.readerIndex(),
decodedLengthOfData);
in = buffers.length == 1 ?
CodedInputStream.newInstance(buffers[0]) :
newCodedInputStream(buffers, decodedLengthOfData);
}
t = parser.parseFrom(in);
} catch (InvalidProtocolBufferException e) {
throw new SerializationException(e);
}
if (!compressed) {
// The NIO buffer indexes are not connected to the Buffer indexes, so we need to update
// our indexes and discard any bytes if necessary.
toDeserialize.skipBytes(lengthOfData);
}
if (toDeserialize == accumulate) {
accumulate.discardSomeReadBytes();
}
// We parsed the expected data, update the state to prepare for parsing the next frame.
final int oldLengthOfData = lengthOfData;
lengthOfData = -1;
compressed = false;
// If we don't have more than a full header, just bail and try again later when more data arrives.
if (toDeserialize.readableBytes() < LENGTH_PREFIXED_MESSAGE_HEADER_BYTES) {
// Before we bail out, we need to save the accumulated data for next time.
if (toDeserialize != accumulate && toDeserialize.readableBytes() != 0) {
accumulate.addBuffer(toDeserialize, true);
}
if (parsedData == null) {
return singletonList(t);
}
parsedData.add(t);
return parsedData;
} else {
if (parsedData == null) {
// assume roughly uniform message sizes when estimating the initial size of the array.
parsedData = new ArrayList<>(1 + max(1, toDeserialize.readableBytes() /
(oldLengthOfData + LENGTH_PREFIXED_MESSAGE_HEADER_BYTES)));
}
parsedData.add(t);
}
}
}
}