in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java [180:222]
public void addSplits(List<DynamicKafkaSourceSplit> splits) {
logger.info("Adding splits to reader {}: {}", readerContext.getIndexOfSubtask(), splits);
// at startup, don't add splits until we get confirmation from enumerator of the current
// metadata
if (!isActivelyConsumingSplits) {
pendingSplits.addAll(splits);
return;
}
Map<String, List<KafkaPartitionSplit>> clusterSplitsMap = new HashMap<>();
for (DynamicKafkaSourceSplit split : splits) {
clusterSplitsMap
.computeIfAbsent(split.getKafkaClusterId(), unused -> new ArrayList<>())
.add(split);
}
Set<String> kafkaClusterIds = clusterSplitsMap.keySet();
boolean newCluster = false;
for (String kafkaClusterId : kafkaClusterIds) {
// if a reader corresponding to the split doesn't exist, create it
// it is possible that the splits come before the source event
if (!clusterReaderMap.containsKey(kafkaClusterId)) {
try {
KafkaSourceReader<T> kafkaSourceReader = createReader(kafkaClusterId);
clusterReaderMap.put(kafkaClusterId, kafkaSourceReader);
kafkaSourceReader.start();
newCluster = true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
// add splits
KafkaSourceReader<T> reader = clusterReaderMap.get(kafkaClusterId);
reader.addSplits(clusterSplitsMap.get(kafkaClusterId));
}
// reset the availability future to also depend on the new sub readers
if (newCluster) {
completeAndResetAvailabilityHelper();
}
}