public InputStatus pollNext()

in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java [135:162]


    public InputStatus pollNext(ReaderOutput<T> readerOutput) throws Exception {
        // at startup, do not return end of input if metadata event has not been received
        if (clusterReaderMap.isEmpty()) {
            return logAndReturnInputStatus(InputStatus.NOTHING_AVAILABLE);
        }

        if (restartingReaders.get()) {
            logger.debug("Poll next invoked while restarting readers");
            return logAndReturnInputStatus(InputStatus.NOTHING_AVAILABLE);
        }

        boolean isMoreAvailable = false;
        boolean isNothingAvailable = false;

        for (Map.Entry<String, KafkaSourceReader<T>> clusterReader : clusterReaderMap.entrySet()) {
            InputStatus inputStatus = clusterReader.getValue().pollNext(readerOutput);
            switch (inputStatus) {
                case MORE_AVAILABLE:
                    isMoreAvailable = true;
                    break;
                case NOTHING_AVAILABLE:
                    isNothingAvailable = true;
                    break;
            }
        }

        return logAndReturnInputStatus(consolidateInputStatus(isMoreAvailable, isNothingAvailable));
    }