public boolean next()

in flight/flight-core/src/main/java/org/apache/arrow/flight/FlightStream.java [239:317]


  public boolean next() {
    try {
      if (completed.isDone() && queue.isEmpty()) {
        return false;
      }

      pending.decrementAndGet();
      requestOutstanding();

      Object data = queue.take();
      if (DONE == data) {
        queue.put(DONE);
        // Other code ignores the value of this CompletableFuture, only whether it's completed (or
        // has an exception)
        completed.complete(null);
        return false;
      } else if (DONE_EX == data) {
        queue.put(DONE_EX);
        if (ex instanceof Exception) {
          throw (Exception) ex;
        } else {
          throw new Exception(ex);
        }
      } else {
        try (ArrowMessage msg = ((ArrowMessage) data)) {
          if (msg.getMessageType() == HeaderType.NONE) {
            updateMetadata(msg);
            // We received a message without data, so erase any leftover data
            if (fulfilledRoot != null) {
              fulfilledRoot.clear();
            }
          } else if (msg.getMessageType() == HeaderType.RECORD_BATCH) {
            checkMetadataVersion(msg);
            // Ensure we have the root
            root.get().clear();
            try (ArrowRecordBatch arb = msg.asRecordBatch()) {
              loader.load(arb);
            }
            updateMetadata(msg);
          } else if (msg.getMessageType() == HeaderType.DICTIONARY_BATCH) {
            checkMetadataVersion(msg);
            // Ensure we have the root
            root.get().clear();
            try (ArrowDictionaryBatch arb = msg.asDictionaryBatch()) {
              final long id = arb.getDictionaryId();
              if (dictionaries == null) {
                throw new IllegalStateException(
                    "Dictionary ownership was claimed by the application.");
              }
              final Dictionary dictionary = dictionaries.lookup(id);
              if (dictionary == null) {
                throw new IllegalArgumentException("Dictionary not defined in schema: ID " + id);
              }

              final FieldVector vector = dictionary.getVector();
              final VectorSchemaRoot dictionaryRoot =
                  new VectorSchemaRoot(
                      Collections.singletonList(vector.getField()),
                      Collections.singletonList(vector),
                      0);
              final VectorLoader dictionaryLoader = new VectorLoader(dictionaryRoot);
              dictionaryLoader.load(arb.getDictionary());
            }
            return next();
          } else {
            throw new UnsupportedOperationException(
                "Message type is unsupported: " + msg.getMessageType());
          }
          return true;
        }
      }
    } catch (RuntimeException e) {
      throw e;
    } catch (ExecutionException e) {
      throw StatusUtils.fromThrowable(e.getCause());
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }