private InputStream asInputStream()

in flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java [379:485]


  private InputStream asInputStream() {
    if (message == null) {
      // If we have no IPC message, it's a pure-metadata message
      final FlightData.Builder builder = FlightData.newBuilder();
      if (descriptor != null) {
        builder.setFlightDescriptor(descriptor);
      }
      if (appMetadata != null) {
        builder.setAppMetadata(ByteString.copyFrom(appMetadata.nioBuffer()));
      }
      return NO_BODY_MARSHALLER.stream(builder.build());
    }

    try {
      final ByteString bytes =
          ByteString.copyFrom(message.getMessageBuffer(), message.bytesAfterMessage());

      if (getMessageType() == HeaderType.SCHEMA) {

        final FlightData.Builder builder = FlightData.newBuilder().setDataHeader(bytes);

        if (descriptor != null) {
          builder.setFlightDescriptor(descriptor);
        }

        Preconditions.checkArgument(bufs.isEmpty());
        return NO_BODY_MARSHALLER.stream(builder.build());
      }

      Preconditions.checkArgument(
          getMessageType() == HeaderType.RECORD_BATCH
              || getMessageType() == HeaderType.DICTIONARY_BATCH);
      // There may be no buffers in the case that we write only a null array
      Preconditions.checkArgument(
          descriptor == null, "Descriptor should only be included in the schema message.");

      ByteArrayOutputStream baos = new ByteArrayOutputStream();
      CodedOutputStream cos = CodedOutputStream.newInstance(baos);
      cos.writeBytes(FlightData.DATA_HEADER_FIELD_NUMBER, bytes);

      if (appMetadata != null && appMetadata.capacity() > 0) {
        // Must call slice() as CodedOutputStream#writeByteBuffer writes -capacity- bytes, not
        // -limit- bytes
        cos.writeByteBuffer(FlightData.APP_METADATA_FIELD_NUMBER, appMetadata.nioBuffer().slice());
      }

      cos.writeTag(FlightData.DATA_BODY_FIELD_NUMBER, WireFormat.WIRETYPE_LENGTH_DELIMITED);
      int size = 0;
      List<ByteBuf> allBufs = new ArrayList<>();
      for (ArrowBuf b : bufs) {
        // [ARROW-11066] This creates a Netty buffer whose refcnt is INDEPENDENT of the backing
        // Arrow buffer. This is susceptible to use-after-free, so we subclass CompositeByteBuf
        // below to tie the Arrow buffer refcnt to the Netty buffer refcnt
        allBufs.add(Unpooled.wrappedBuffer(b.nioBuffer()).retain());
        size += (int) b.readableBytes();
        // [ARROW-4213] These buffers must be aligned to an 8-byte boundary in order to be readable
        // from C++.
        if (b.readableBytes() % 8 != 0) {
          int paddingBytes = (int) (8 - (b.readableBytes() % 8));
          assert paddingBytes > 0 && paddingBytes < 8;
          size += paddingBytes;
          allBufs.add(PADDING_BUFFERS.get(paddingBytes).retain());
        }
      }
      // rawvarint is used for length definition.
      cos.writeUInt32NoTag(size);
      cos.flush();

      ByteBuf initialBuf = Unpooled.buffer(baos.size());
      initialBuf.writeBytes(baos.toByteArray());
      final CompositeByteBuf bb;
      final ImmutableList<ByteBuf> byteBufs =
          ImmutableList.<ByteBuf>builder().add(initialBuf).addAll(allBufs).build();
      // See: https://github.com/apache/arrow/issues/40039
      // CompositeByteBuf requires us to pass maxNumComponents to constructor.
      // This number will be used to decide when to stop adding new components as separate buffers
      // and instead merge existing components into a new buffer by performing a memory copy.
      // We want to avoind memory copies as much as possible so we want to set the limit that won't
      // be reached.
      // At a first glance it seems reasonable to set limit to byteBufs.size() + 1,
      // because it will be enough to avoid merges of byteBufs that we pass to constructor.
      // But later this buffer will be written to socket by Netty
      // and DefaultHttp2ConnectionEncoder uses CoalescingBufferQueue to combine small buffers into
      // one.
      // Method CoalescingBufferQueue.compose will check if current buffer is already a
      // CompositeByteBuf
      // and if it's the case it will just add a new component to this buffer.
      // But in out case if we set maxNumComponents=byteBufs.size() + 1 it will happen on the first
      // attempt
      // to write data to socket because header message is small and Netty will always try to
      // compine it with the
      // large CompositeByteBuf we're creating here.
      // We never want additional memory copies so setting the limit to Integer.MAX_VALUE
      final int maxNumComponents = Integer.MAX_VALUE;
      if (tryZeroCopyWrite) {
        bb = new ArrowBufRetainingCompositeByteBuf(maxNumComponents, byteBufs, bufs);
      } else {
        // Don't retain the buffers in the non-zero-copy path since we're copying them
        bb =
            new CompositeByteBuf(
                UnpooledByteBufAllocator.DEFAULT, /* direct */ true, maxNumComponents, byteBufs);
      }
      return new DrainableByteBufInputStream(bb, tryZeroCopyWrite);
    } catch (Exception ex) {
      throw new RuntimeException("Unexpected IO Exception", ex);
    }
  }