private static ArrowMessage frame()

in flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java [282:365]


  private static ArrowMessage frame(BufferAllocator allocator, final InputStream stream) {

    try {
      FlightDescriptor descriptor = null;
      MessageMetadataResult header = null;
      ArrowBuf body = null;
      ArrowBuf appMetadata = null;
      while (stream.available() > 0) {
        int tag = readRawVarint32(stream);
        switch (tag) {
          case DESCRIPTOR_TAG:
            {
              int size = readRawVarint32(stream);
              byte[] bytes = new byte[size];
              ByteStreams.readFully(stream, bytes);
              descriptor = FlightDescriptor.parseFrom(bytes);
              break;
            }
          case HEADER_TAG:
            {
              int size = readRawVarint32(stream);
              byte[] bytes = new byte[size];
              ByteStreams.readFully(stream, bytes);
              header = MessageMetadataResult.create(ByteBuffer.wrap(bytes), size);
              break;
            }
          case APP_METADATA_TAG:
            {
              int size = readRawVarint32(stream);
              appMetadata = allocator.buffer(size);
              GetReadableBuffer.readIntoBuffer(stream, appMetadata, size, ENABLE_ZERO_COPY_READ);
              break;
            }
          case BODY_TAG:
            if (body != null) {
              // only read last body.
              body.getReferenceManager().release();
              body = null;
            }
            int size = readRawVarint32(stream);
            body = allocator.buffer(size);
            GetReadableBuffer.readIntoBuffer(stream, body, size, ENABLE_ZERO_COPY_READ);
            break;

          default:
            // ignore unknown fields.
        }
      }
      // Protobuf implementations can omit empty fields, such as body; for some message types, like
      // RecordBatch,
      // this will fail later as we still expect an empty buffer. In those cases only, fill in an
      // empty buffer here -
      // in other cases, like Schema, having an unexpected empty buffer will also cause failures.
      // We don't fill in defaults for fields like header, for which there is no reasonable default,
      // or for appMetadata
      // or descriptor, which are intended to be empty in some cases.
      if (header != null) {
        switch (HeaderType.getHeader(header.headerType())) {
          case SCHEMA:
            // Ignore 0-length buffers in case a Protobuf implementation wrote it out
            if (body != null && body.capacity() == 0) {
              body.close();
              body = null;
            }
            break;
          case DICTIONARY_BATCH:
          case RECORD_BATCH:
            // A Protobuf implementation can skip 0-length bodies, so ensure we fill it in here
            if (body == null) {
              body = allocator.getEmpty();
            }
            break;
          case NONE:
          case TENSOR:
          default:
            // Do nothing
            break;
        }
      }
      return new ArrowMessage(descriptor, header, appMetadata, body);
    } catch (Exception ioe) {
      throw new RuntimeException(ioe);
    }
  }