in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java [214:231]
private void handleNoMoreSplits() {
if (Boundedness.BOUNDED.equals(boundedness)) {
boolean allEnumeratorsHaveSignalledNoMoreSplits = true;
for (StoppableKafkaEnumContextProxy context : clusterEnumContextMap.values()) {
allEnumeratorsHaveSignalledNoMoreSplits =
allEnumeratorsHaveSignalledNoMoreSplits && context.isNoMoreSplits();
}
if (firstDiscoveryComplete && allEnumeratorsHaveSignalledNoMoreSplits) {
logger.info(
"Signal no more splits to all readers: {}",
enumContext.registeredReaders().keySet());
enumContext.registeredReaders().keySet().forEach(enumContext::signalNoMoreSplits);
} else {
logger.info("Not ready to notify no more splits to readers.");
}
}
}