public void onNext()

in flight/flight-core/src/main/java/org/apache/arrow/flight/FlightStream.java [413:485]


    public void onNext(ArrowMessage msg) {
      // Operations here have to be under a lock so that we don't add a message to the queue while
      // in the middle of
      // close().
      requestOutstanding();
      switch (msg.getMessageType()) {
        case NONE:
          {
            // No IPC message - pure metadata or descriptor
            if (msg.getDescriptor() != null) {
              descriptor.set(new FlightDescriptor(msg.getDescriptor()));
            }
            if (msg.getApplicationMetadata() != null) {
              enqueue(msg);
            }
            break;
          }
        case SCHEMA:
          {
            Schema schema = msg.asSchema();

            // if there is app metadata in the schema message, make sure
            // that we don't leak it.
            ArrowBuf meta = msg.getApplicationMetadata();
            if (meta != null) {
              meta.close();
            }

            final List<Field> fields = new ArrayList<>();
            final Map<Long, Dictionary> dictionaryMap = new HashMap<>();
            for (final Field originalField : schema.getFields()) {
              final Field updatedField =
                  DictionaryUtility.toMemoryFormat(originalField, allocator, dictionaryMap);
              fields.add(updatedField);
            }
            for (final Map.Entry<Long, Dictionary> entry : dictionaryMap.entrySet()) {
              dictionaries.put(entry.getValue());
            }
            schema = new Schema(fields, schema.getCustomMetadata());
            metadataVersion =
                MetadataVersion.fromFlatbufID(msg.asSchemaMessage().getMessage().version());
            try {
              MetadataV4UnionChecker.checkRead(schema, metadataVersion);
            } catch (IOException e) {
              ex = e;
              enqueue(DONE_EX);
              break;
            }

            synchronized (completed) {
              if (!completed.isDone()) {
                fulfilledRoot = VectorSchemaRoot.create(schema, allocator);
                loader = new VectorLoader(fulfilledRoot);
                if (msg.getDescriptor() != null) {
                  descriptor.set(new FlightDescriptor(msg.getDescriptor()));
                }
                root.set(fulfilledRoot);
              }
            }
            break;
          }
        case RECORD_BATCH:
        case DICTIONARY_BATCH:
          enqueue(msg);
          break;
        case TENSOR:
        default:
          ex =
              new UnsupportedOperationException(
                  "Unable to handle message of type: " + msg.getMessageType());
          enqueue(DONE_EX);
      }
    }