in flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java [379:485]
private InputStream asInputStream() {
if (message == null) {
// If we have no IPC message, it's a pure-metadata message
final FlightData.Builder builder = FlightData.newBuilder();
if (descriptor != null) {
builder.setFlightDescriptor(descriptor);
}
if (appMetadata != null) {
builder.setAppMetadata(ByteString.copyFrom(appMetadata.nioBuffer()));
}
return NO_BODY_MARSHALLER.stream(builder.build());
}
try {
final ByteString bytes =
ByteString.copyFrom(message.getMessageBuffer(), message.bytesAfterMessage());
if (getMessageType() == HeaderType.SCHEMA) {
final FlightData.Builder builder = FlightData.newBuilder().setDataHeader(bytes);
if (descriptor != null) {
builder.setFlightDescriptor(descriptor);
}
Preconditions.checkArgument(bufs.isEmpty());
return NO_BODY_MARSHALLER.stream(builder.build());
}
Preconditions.checkArgument(
getMessageType() == HeaderType.RECORD_BATCH
|| getMessageType() == HeaderType.DICTIONARY_BATCH);
// There may be no buffers in the case that we write only a null array
Preconditions.checkArgument(
descriptor == null, "Descriptor should only be included in the schema message.");
ByteArrayOutputStream baos = new ByteArrayOutputStream();
CodedOutputStream cos = CodedOutputStream.newInstance(baos);
cos.writeBytes(FlightData.DATA_HEADER_FIELD_NUMBER, bytes);
if (appMetadata != null && appMetadata.capacity() > 0) {
// Must call slice() as CodedOutputStream#writeByteBuffer writes -capacity- bytes, not
// -limit- bytes
cos.writeByteBuffer(FlightData.APP_METADATA_FIELD_NUMBER, appMetadata.nioBuffer().slice());
}
cos.writeTag(FlightData.DATA_BODY_FIELD_NUMBER, WireFormat.WIRETYPE_LENGTH_DELIMITED);
int size = 0;
List<ByteBuf> allBufs = new ArrayList<>();
for (ArrowBuf b : bufs) {
// [ARROW-11066] This creates a Netty buffer whose refcnt is INDEPENDENT of the backing
// Arrow buffer. This is susceptible to use-after-free, so we subclass CompositeByteBuf
// below to tie the Arrow buffer refcnt to the Netty buffer refcnt
allBufs.add(Unpooled.wrappedBuffer(b.nioBuffer()).retain());
size += (int) b.readableBytes();
// [ARROW-4213] These buffers must be aligned to an 8-byte boundary in order to be readable
// from C++.
if (b.readableBytes() % 8 != 0) {
int paddingBytes = (int) (8 - (b.readableBytes() % 8));
assert paddingBytes > 0 && paddingBytes < 8;
size += paddingBytes;
allBufs.add(PADDING_BUFFERS.get(paddingBytes).retain());
}
}
// rawvarint is used for length definition.
cos.writeUInt32NoTag(size);
cos.flush();
ByteBuf initialBuf = Unpooled.buffer(baos.size());
initialBuf.writeBytes(baos.toByteArray());
final CompositeByteBuf bb;
final ImmutableList<ByteBuf> byteBufs =
ImmutableList.<ByteBuf>builder().add(initialBuf).addAll(allBufs).build();
// See: https://github.com/apache/arrow/issues/40039
// CompositeByteBuf requires us to pass maxNumComponents to constructor.
// This number will be used to decide when to stop adding new components as separate buffers
// and instead merge existing components into a new buffer by performing a memory copy.
// We want to avoind memory copies as much as possible so we want to set the limit that won't
// be reached.
// At a first glance it seems reasonable to set limit to byteBufs.size() + 1,
// because it will be enough to avoid merges of byteBufs that we pass to constructor.
// But later this buffer will be written to socket by Netty
// and DefaultHttp2ConnectionEncoder uses CoalescingBufferQueue to combine small buffers into
// one.
// Method CoalescingBufferQueue.compose will check if current buffer is already a
// CompositeByteBuf
// and if it's the case it will just add a new component to this buffer.
// But in out case if we set maxNumComponents=byteBufs.size() + 1 it will happen on the first
// attempt
// to write data to socket because header message is small and Netty will always try to
// compine it with the
// large CompositeByteBuf we're creating here.
// We never want additional memory copies so setting the limit to Integer.MAX_VALUE
final int maxNumComponents = Integer.MAX_VALUE;
if (tryZeroCopyWrite) {
bb = new ArrowBufRetainingCompositeByteBuf(maxNumComponents, byteBufs, bufs);
} else {
// Don't retain the buffers in the non-zero-copy path since we're copying them
bb =
new CompositeByteBuf(
UnpooledByteBufAllocator.DEFAULT, /* direct */ true, maxNumComponents, byteBufs);
}
return new DrainableByteBufInputStream(bb, tryZeroCopyWrite);
} catch (Exception ex) {
throw new RuntimeException("Unexpected IO Exception", ex);
}
}