in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java [235:316]
private void onHandleSubscribedStreamsFetch(Set<KafkaStream> fetchedKafkaStreams, Throwable t) {
firstDiscoveryComplete = true;
Set<KafkaStream> handledFetchKafkaStreams =
handleFetchSubscribedStreamsError(fetchedKafkaStreams, t);
Map<String, Set<String>> newClustersTopicsMap = new HashMap<>();
Map<String, Properties> clusterProperties = new HashMap<>();
for (KafkaStream kafkaStream : handledFetchKafkaStreams) {
for (Entry<String, ClusterMetadata> entry :
kafkaStream.getClusterMetadataMap().entrySet()) {
String kafkaClusterId = entry.getKey();
ClusterMetadata clusterMetadata = entry.getValue();
newClustersTopicsMap
.computeIfAbsent(kafkaClusterId, (unused) -> new HashSet<>())
.addAll(clusterMetadata.getTopics());
clusterProperties.put(kafkaClusterId, clusterMetadata.getProperties());
}
}
// don't do anything if no change
if (latestClusterTopicsMap.equals(newClustersTopicsMap)) {
return;
}
if (logger.isInfoEnabled()) {
// log the maps in a sorted fashion so it's easy to see the changes
logger.info(
"Detected changed cluster topics after metadata refresh:\nPrevious: {}\nNew: {}",
new TreeMap<>(latestClusterTopicsMap),
new TreeMap<>(newClustersTopicsMap));
}
DynamicKafkaSourceEnumState dynamicKafkaSourceEnumState;
try {
dynamicKafkaSourceEnumState = snapshotState(-1);
} catch (Exception e) {
throw new RuntimeException("unable to snapshot state in metadata change", e);
}
logger.info("Closing enumerators due to metadata change");
closeAllEnumeratorsAndContexts();
latestClusterTopicsMap = newClustersTopicsMap;
latestKafkaStreams = handledFetchKafkaStreams;
sendMetadataUpdateEventToAvailableReaders();
// create enumerators
for (Entry<String, Set<String>> activeClusterTopics : latestClusterTopicsMap.entrySet()) {
KafkaSourceEnumState kafkaSourceEnumState =
dynamicKafkaSourceEnumState
.getClusterEnumeratorStates()
.get(activeClusterTopics.getKey());
final KafkaSourceEnumState newKafkaSourceEnumState;
if (kafkaSourceEnumState != null) {
final Set<String> activeTopics = activeClusterTopics.getValue();
// filter out removed topics
Set<TopicPartitionAndAssignmentStatus> partitions =
kafkaSourceEnumState.partitions().stream()
.filter(tp -> activeTopics.contains(tp.topicPartition().topic()))
.collect(Collectors.toSet());
newKafkaSourceEnumState =
new KafkaSourceEnumState(
partitions, kafkaSourceEnumState.initialDiscoveryFinished());
} else {
newKafkaSourceEnumState = new KafkaSourceEnumState(Collections.emptySet(), false);
}
// restarts enumerator from state using only the active topic partitions, to avoid
// sending duplicate splits from enumerator
createEnumeratorWithAssignedTopicPartitions(
activeClusterTopics.getKey(),
activeClusterTopics.getValue(),
newKafkaSourceEnumState,
clusterProperties.get(activeClusterTopics.getKey()));
}
startAllEnumerators();
}