in flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java [282:365]
private static ArrowMessage frame(BufferAllocator allocator, final InputStream stream) {
try {
FlightDescriptor descriptor = null;
MessageMetadataResult header = null;
ArrowBuf body = null;
ArrowBuf appMetadata = null;
while (stream.available() > 0) {
int tag = readRawVarint32(stream);
switch (tag) {
case DESCRIPTOR_TAG:
{
int size = readRawVarint32(stream);
byte[] bytes = new byte[size];
ByteStreams.readFully(stream, bytes);
descriptor = FlightDescriptor.parseFrom(bytes);
break;
}
case HEADER_TAG:
{
int size = readRawVarint32(stream);
byte[] bytes = new byte[size];
ByteStreams.readFully(stream, bytes);
header = MessageMetadataResult.create(ByteBuffer.wrap(bytes), size);
break;
}
case APP_METADATA_TAG:
{
int size = readRawVarint32(stream);
appMetadata = allocator.buffer(size);
GetReadableBuffer.readIntoBuffer(stream, appMetadata, size, ENABLE_ZERO_COPY_READ);
break;
}
case BODY_TAG:
if (body != null) {
// only read last body.
body.getReferenceManager().release();
body = null;
}
int size = readRawVarint32(stream);
body = allocator.buffer(size);
GetReadableBuffer.readIntoBuffer(stream, body, size, ENABLE_ZERO_COPY_READ);
break;
default:
// ignore unknown fields.
}
}
// Protobuf implementations can omit empty fields, such as body; for some message types, like
// RecordBatch,
// this will fail later as we still expect an empty buffer. In those cases only, fill in an
// empty buffer here -
// in other cases, like Schema, having an unexpected empty buffer will also cause failures.
// We don't fill in defaults for fields like header, for which there is no reasonable default,
// or for appMetadata
// or descriptor, which are intended to be empty in some cases.
if (header != null) {
switch (HeaderType.getHeader(header.headerType())) {
case SCHEMA:
// Ignore 0-length buffers in case a Protobuf implementation wrote it out
if (body != null && body.capacity() == 0) {
body.close();
body = null;
}
break;
case DICTIONARY_BATCH:
case RECORD_BATCH:
// A Protobuf implementation can skip 0-length bodies, so ensure we fill it in here
if (body == null) {
body = allocator.getEmpty();
}
break;
case NONE:
case TENSOR:
default:
// Do nothing
break;
}
}
return new ArrowMessage(descriptor, header, appMetadata, body);
} catch (Exception ioe) {
throw new RuntimeException(ioe);
}
}