in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java [135:162]
public InputStatus pollNext(ReaderOutput<T> readerOutput) throws Exception {
// at startup, do not return end of input if metadata event has not been received
if (clusterReaderMap.isEmpty()) {
return logAndReturnInputStatus(InputStatus.NOTHING_AVAILABLE);
}
if (restartingReaders.get()) {
logger.debug("Poll next invoked while restarting readers");
return logAndReturnInputStatus(InputStatus.NOTHING_AVAILABLE);
}
boolean isMoreAvailable = false;
boolean isNothingAvailable = false;
for (Map.Entry<String, KafkaSourceReader<T>> clusterReader : clusterReaderMap.entrySet()) {
InputStatus inputStatus = clusterReader.getValue().pollNext(readerOutput);
switch (inputStatus) {
case MORE_AVAILABLE:
isMoreAvailable = true;
break;
case NOTHING_AVAILABLE:
isNothingAvailable = true;
break;
}
}
return logAndReturnInputStatus(consolidateInputStatus(isMoreAvailable, isNothingAvailable));
}