in flight/flight-core/src/main/java/org/apache/arrow/flight/FlightStream.java [413:485]
public void onNext(ArrowMessage msg) {
// Operations here have to be under a lock so that we don't add a message to the queue while
// in the middle of
// close().
requestOutstanding();
switch (msg.getMessageType()) {
case NONE:
{
// No IPC message - pure metadata or descriptor
if (msg.getDescriptor() != null) {
descriptor.set(new FlightDescriptor(msg.getDescriptor()));
}
if (msg.getApplicationMetadata() != null) {
enqueue(msg);
}
break;
}
case SCHEMA:
{
Schema schema = msg.asSchema();
// if there is app metadata in the schema message, make sure
// that we don't leak it.
ArrowBuf meta = msg.getApplicationMetadata();
if (meta != null) {
meta.close();
}
final List<Field> fields = new ArrayList<>();
final Map<Long, Dictionary> dictionaryMap = new HashMap<>();
for (final Field originalField : schema.getFields()) {
final Field updatedField =
DictionaryUtility.toMemoryFormat(originalField, allocator, dictionaryMap);
fields.add(updatedField);
}
for (final Map.Entry<Long, Dictionary> entry : dictionaryMap.entrySet()) {
dictionaries.put(entry.getValue());
}
schema = new Schema(fields, schema.getCustomMetadata());
metadataVersion =
MetadataVersion.fromFlatbufID(msg.asSchemaMessage().getMessage().version());
try {
MetadataV4UnionChecker.checkRead(schema, metadataVersion);
} catch (IOException e) {
ex = e;
enqueue(DONE_EX);
break;
}
synchronized (completed) {
if (!completed.isDone()) {
fulfilledRoot = VectorSchemaRoot.create(schema, allocator);
loader = new VectorLoader(fulfilledRoot);
if (msg.getDescriptor() != null) {
descriptor.set(new FlightDescriptor(msg.getDescriptor()));
}
root.set(fulfilledRoot);
}
}
break;
}
case RECORD_BATCH:
case DICTIONARY_BATCH:
enqueue(msg);
break;
case TENSOR:
default:
ex =
new UnsupportedOperationException(
"Unable to handle message of type: " + msg.getMessageType());
enqueue(DONE_EX);
}
}