private void onHandleSubscribedStreamsFetch()

in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java [235:316]


    private void onHandleSubscribedStreamsFetch(Set<KafkaStream> fetchedKafkaStreams, Throwable t) {
        firstDiscoveryComplete = true;
        Set<KafkaStream> handledFetchKafkaStreams =
                handleFetchSubscribedStreamsError(fetchedKafkaStreams, t);

        Map<String, Set<String>> newClustersTopicsMap = new HashMap<>();
        Map<String, Properties> clusterProperties = new HashMap<>();
        for (KafkaStream kafkaStream : handledFetchKafkaStreams) {
            for (Entry<String, ClusterMetadata> entry :
                    kafkaStream.getClusterMetadataMap().entrySet()) {
                String kafkaClusterId = entry.getKey();
                ClusterMetadata clusterMetadata = entry.getValue();

                newClustersTopicsMap
                        .computeIfAbsent(kafkaClusterId, (unused) -> new HashSet<>())
                        .addAll(clusterMetadata.getTopics());
                clusterProperties.put(kafkaClusterId, clusterMetadata.getProperties());
            }
        }

        // don't do anything if no change
        if (latestClusterTopicsMap.equals(newClustersTopicsMap)) {
            return;
        }

        if (logger.isInfoEnabled()) {
            // log the maps in a sorted fashion so it's easy to see the changes
            logger.info(
                    "Detected changed cluster topics after metadata refresh:\nPrevious: {}\nNew: {}",
                    new TreeMap<>(latestClusterTopicsMap),
                    new TreeMap<>(newClustersTopicsMap));
        }

        DynamicKafkaSourceEnumState dynamicKafkaSourceEnumState;
        try {
            dynamicKafkaSourceEnumState = snapshotState(-1);
        } catch (Exception e) {
            throw new RuntimeException("unable to snapshot state in metadata change", e);
        }

        logger.info("Closing enumerators due to metadata change");

        closeAllEnumeratorsAndContexts();
        latestClusterTopicsMap = newClustersTopicsMap;
        latestKafkaStreams = handledFetchKafkaStreams;
        sendMetadataUpdateEventToAvailableReaders();

        // create enumerators
        for (Entry<String, Set<String>> activeClusterTopics : latestClusterTopicsMap.entrySet()) {
            KafkaSourceEnumState kafkaSourceEnumState =
                    dynamicKafkaSourceEnumState
                            .getClusterEnumeratorStates()
                            .get(activeClusterTopics.getKey());

            final KafkaSourceEnumState newKafkaSourceEnumState;
            if (kafkaSourceEnumState != null) {
                final Set<String> activeTopics = activeClusterTopics.getValue();

                // filter out removed topics
                Set<TopicPartitionAndAssignmentStatus> partitions =
                        kafkaSourceEnumState.partitions().stream()
                                .filter(tp -> activeTopics.contains(tp.topicPartition().topic()))
                                .collect(Collectors.toSet());

                newKafkaSourceEnumState =
                        new KafkaSourceEnumState(
                                partitions, kafkaSourceEnumState.initialDiscoveryFinished());
            } else {
                newKafkaSourceEnumState = new KafkaSourceEnumState(Collections.emptySet(), false);
            }

            // restarts enumerator from state using only the active topic partitions, to avoid
            // sending duplicate splits from enumerator
            createEnumeratorWithAssignedTopicPartitions(
                    activeClusterTopics.getKey(),
                    activeClusterTopics.getValue(),
                    newKafkaSourceEnumState,
                    clusterProperties.get(activeClusterTopics.getKey()));
        }

        startAllEnumerators();
    }