public void handleSourceEvents()

in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java [229:346]


    public void handleSourceEvents(SourceEvent sourceEvent) {
        Preconditions.checkArgument(
                sourceEvent instanceof MetadataUpdateEvent,
                "Received invalid source event: " + sourceEvent);

        logger.info(
                "Received source event {}: subtask={}",
                sourceEvent,
                readerContext.getIndexOfSubtask());
        Set<KafkaStream> newKafkaStreams = ((MetadataUpdateEvent) sourceEvent).getKafkaStreams();
        Map<String, Set<String>> newClustersAndTopics = new HashMap<>();
        Map<String, Properties> newClustersProperties = new HashMap<>();
        for (KafkaStream kafkaStream : newKafkaStreams) {
            for (Map.Entry<String, ClusterMetadata> clusterMetadataMapEntry :
                    kafkaStream.getClusterMetadataMap().entrySet()) {
                newClustersAndTopics
                        .computeIfAbsent(
                                clusterMetadataMapEntry.getKey(), (unused) -> new HashSet<>())
                        .addAll(clusterMetadataMapEntry.getValue().getTopics());

                newClustersProperties.put(
                        clusterMetadataMapEntry.getKey(),
                        clusterMetadataMapEntry.getValue().getProperties());
            }
        }

        // filter current splits with the metadata update
        List<DynamicKafkaSourceSplit> currentSplitState = snapshotStateFromAllReaders(-1);
        logger.info(
                "Snapshotting split state for reader {}: {}",
                readerContext.getIndexOfSubtask(),
                currentSplitState);
        Map<String, Set<String>> currentMetadataFromState = new HashMap<>();
        Map<String, List<KafkaPartitionSplit>> filteredNewClusterSplitStateMap = new HashMap<>();

        // the data structures above
        for (DynamicKafkaSourceSplit split : currentSplitState) {
            currentMetadataFromState
                    .computeIfAbsent(split.getKafkaClusterId(), (ignore) -> new HashSet<>())
                    .add(split.getKafkaPartitionSplit().getTopic());
            // check if cluster topic exists in the metadata update
            if (newClustersAndTopics.containsKey(split.getKafkaClusterId())
                    && newClustersAndTopics
                            .get(split.getKafkaClusterId())
                            .contains(split.getKafkaPartitionSplit().getTopic())) {
                filteredNewClusterSplitStateMap
                        .computeIfAbsent(split.getKafkaClusterId(), (ignore) -> new ArrayList<>())
                        .add(split);
            } else {
                logger.info("Skipping outdated split due to metadata changes: {}", split);
            }
        }

        // only restart if there was metadata change to handle duplicate MetadataUpdateEvent from
        // enumerator. We can possibly only restart the readers whose metadata has changed but that
        // comes at the cost of complexity and it is an optimization for a corner case. We can
        // revisit if necessary.
        if (!newClustersAndTopics.equals(currentMetadataFromState)) {
            restartingReaders.set(true);
            closeAllReadersAndClearState();

            clustersProperties.putAll(newClustersProperties);
            for (String kafkaClusterId : newClustersAndTopics.keySet()) {
                try {
                    // restart kafka source readers with the relevant state
                    KafkaSourceReader<T> kafkaSourceReader = createReader(kafkaClusterId);
                    clusterReaderMap.put(kafkaClusterId, kafkaSourceReader);
                    if (filteredNewClusterSplitStateMap.containsKey(kafkaClusterId)) {
                        kafkaSourceReader.addSplits(
                                filteredNewClusterSplitStateMap.get(kafkaClusterId));
                    }
                    kafkaSourceReader.start();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }

            // reset the availability future to also depend on the new sub readers
            completeAndResetAvailabilityHelper();
        } else {
            // update properties even on no metadata change
            clustersProperties.clear();
            clustersProperties.putAll(newClustersProperties);
        }

        // finally mark the reader as active, if not already and add pending splits
        if (!isActivelyConsumingSplits) {
            isActivelyConsumingSplits = true;
        }

        if (!pendingSplits.isEmpty()) {
            List<DynamicKafkaSourceSplit> validPendingSplits =
                    pendingSplits.stream()
                            // Pending splits is used to cache splits at startup, before metadata
                            // update event arrives. Splits in state could be old and it's possible
                            // to not have another metadata update event, so need to filter the
                            // splits at this point.
                            .filter(
                                    pendingSplit -> {
                                        boolean splitValid =
                                                isSplitForActiveClusters(
                                                        pendingSplit, newClustersAndTopics);
                                        if (!splitValid) {
                                            logger.info(
                                                    "Removing invalid split for reader: {}",
                                                    pendingSplit);
                                        }
                                        return splitValid;
                                    })
                            .collect(Collectors.toList());

            addSplits(validPendingSplits);
            pendingSplits.clear();
            if (isNoMoreSplits) {
                notifyNoMoreSplits();
            }
        }
    }