in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java [229:346]
public void handleSourceEvents(SourceEvent sourceEvent) {
Preconditions.checkArgument(
sourceEvent instanceof MetadataUpdateEvent,
"Received invalid source event: " + sourceEvent);
logger.info(
"Received source event {}: subtask={}",
sourceEvent,
readerContext.getIndexOfSubtask());
Set<KafkaStream> newKafkaStreams = ((MetadataUpdateEvent) sourceEvent).getKafkaStreams();
Map<String, Set<String>> newClustersAndTopics = new HashMap<>();
Map<String, Properties> newClustersProperties = new HashMap<>();
for (KafkaStream kafkaStream : newKafkaStreams) {
for (Map.Entry<String, ClusterMetadata> clusterMetadataMapEntry :
kafkaStream.getClusterMetadataMap().entrySet()) {
newClustersAndTopics
.computeIfAbsent(
clusterMetadataMapEntry.getKey(), (unused) -> new HashSet<>())
.addAll(clusterMetadataMapEntry.getValue().getTopics());
newClustersProperties.put(
clusterMetadataMapEntry.getKey(),
clusterMetadataMapEntry.getValue().getProperties());
}
}
// filter current splits with the metadata update
List<DynamicKafkaSourceSplit> currentSplitState = snapshotStateFromAllReaders(-1);
logger.info(
"Snapshotting split state for reader {}: {}",
readerContext.getIndexOfSubtask(),
currentSplitState);
Map<String, Set<String>> currentMetadataFromState = new HashMap<>();
Map<String, List<KafkaPartitionSplit>> filteredNewClusterSplitStateMap = new HashMap<>();
// the data structures above
for (DynamicKafkaSourceSplit split : currentSplitState) {
currentMetadataFromState
.computeIfAbsent(split.getKafkaClusterId(), (ignore) -> new HashSet<>())
.add(split.getKafkaPartitionSplit().getTopic());
// check if cluster topic exists in the metadata update
if (newClustersAndTopics.containsKey(split.getKafkaClusterId())
&& newClustersAndTopics
.get(split.getKafkaClusterId())
.contains(split.getKafkaPartitionSplit().getTopic())) {
filteredNewClusterSplitStateMap
.computeIfAbsent(split.getKafkaClusterId(), (ignore) -> new ArrayList<>())
.add(split);
} else {
logger.info("Skipping outdated split due to metadata changes: {}", split);
}
}
// only restart if there was metadata change to handle duplicate MetadataUpdateEvent from
// enumerator. We can possibly only restart the readers whose metadata has changed but that
// comes at the cost of complexity and it is an optimization for a corner case. We can
// revisit if necessary.
if (!newClustersAndTopics.equals(currentMetadataFromState)) {
restartingReaders.set(true);
closeAllReadersAndClearState();
clustersProperties.putAll(newClustersProperties);
for (String kafkaClusterId : newClustersAndTopics.keySet()) {
try {
// restart kafka source readers with the relevant state
KafkaSourceReader<T> kafkaSourceReader = createReader(kafkaClusterId);
clusterReaderMap.put(kafkaClusterId, kafkaSourceReader);
if (filteredNewClusterSplitStateMap.containsKey(kafkaClusterId)) {
kafkaSourceReader.addSplits(
filteredNewClusterSplitStateMap.get(kafkaClusterId));
}
kafkaSourceReader.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
// reset the availability future to also depend on the new sub readers
completeAndResetAvailabilityHelper();
} else {
// update properties even on no metadata change
clustersProperties.clear();
clustersProperties.putAll(newClustersProperties);
}
// finally mark the reader as active, if not already and add pending splits
if (!isActivelyConsumingSplits) {
isActivelyConsumingSplits = true;
}
if (!pendingSplits.isEmpty()) {
List<DynamicKafkaSourceSplit> validPendingSplits =
pendingSplits.stream()
// Pending splits is used to cache splits at startup, before metadata
// update event arrives. Splits in state could be old and it's possible
// to not have another metadata update event, so need to filter the
// splits at this point.
.filter(
pendingSplit -> {
boolean splitValid =
isSplitForActiveClusters(
pendingSplit, newClustersAndTopics);
if (!splitValid) {
logger.info(
"Removing invalid split for reader: {}",
pendingSplit);
}
return splitValid;
})
.collect(Collectors.toList());
addSplits(validPendingSplits);
pendingSplits.clear();
if (isNoMoreSplits) {
notifyNoMoreSplits();
}
}
}