in flight/flight-core/src/main/java/org/apache/arrow/flight/FlightStream.java [239:317]
public boolean next() {
try {
if (completed.isDone() && queue.isEmpty()) {
return false;
}
pending.decrementAndGet();
requestOutstanding();
Object data = queue.take();
if (DONE == data) {
queue.put(DONE);
// Other code ignores the value of this CompletableFuture, only whether it's completed (or
// has an exception)
completed.complete(null);
return false;
} else if (DONE_EX == data) {
queue.put(DONE_EX);
if (ex instanceof Exception) {
throw (Exception) ex;
} else {
throw new Exception(ex);
}
} else {
try (ArrowMessage msg = ((ArrowMessage) data)) {
if (msg.getMessageType() == HeaderType.NONE) {
updateMetadata(msg);
// We received a message without data, so erase any leftover data
if (fulfilledRoot != null) {
fulfilledRoot.clear();
}
} else if (msg.getMessageType() == HeaderType.RECORD_BATCH) {
checkMetadataVersion(msg);
// Ensure we have the root
root.get().clear();
try (ArrowRecordBatch arb = msg.asRecordBatch()) {
loader.load(arb);
}
updateMetadata(msg);
} else if (msg.getMessageType() == HeaderType.DICTIONARY_BATCH) {
checkMetadataVersion(msg);
// Ensure we have the root
root.get().clear();
try (ArrowDictionaryBatch arb = msg.asDictionaryBatch()) {
final long id = arb.getDictionaryId();
if (dictionaries == null) {
throw new IllegalStateException(
"Dictionary ownership was claimed by the application.");
}
final Dictionary dictionary = dictionaries.lookup(id);
if (dictionary == null) {
throw new IllegalArgumentException("Dictionary not defined in schema: ID " + id);
}
final FieldVector vector = dictionary.getVector();
final VectorSchemaRoot dictionaryRoot =
new VectorSchemaRoot(
Collections.singletonList(vector.getField()),
Collections.singletonList(vector),
0);
final VectorLoader dictionaryLoader = new VectorLoader(dictionaryRoot);
dictionaryLoader.load(arb.getDictionary());
}
return next();
} else {
throw new UnsupportedOperationException(
"Message type is unsupported: " + msg.getMessageType());
}
return true;
}
}
} catch (RuntimeException e) {
throw e;
} catch (ExecutionException e) {
throw StatusUtils.fromThrowable(e.getCause());
} catch (Exception e) {
throw new RuntimeException(e);
}
}