public void addSplits()

in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java [180:222]


    public void addSplits(List<DynamicKafkaSourceSplit> splits) {
        logger.info("Adding splits to reader {}: {}", readerContext.getIndexOfSubtask(), splits);
        // at startup, don't add splits until we get confirmation from enumerator of the current
        // metadata
        if (!isActivelyConsumingSplits) {
            pendingSplits.addAll(splits);
            return;
        }

        Map<String, List<KafkaPartitionSplit>> clusterSplitsMap = new HashMap<>();
        for (DynamicKafkaSourceSplit split : splits) {
            clusterSplitsMap
                    .computeIfAbsent(split.getKafkaClusterId(), unused -> new ArrayList<>())
                    .add(split);
        }

        Set<String> kafkaClusterIds = clusterSplitsMap.keySet();

        boolean newCluster = false;
        for (String kafkaClusterId : kafkaClusterIds) {
            // if a reader corresponding to the split doesn't exist, create it
            // it is possible that the splits come before the source event
            if (!clusterReaderMap.containsKey(kafkaClusterId)) {
                try {
                    KafkaSourceReader<T> kafkaSourceReader = createReader(kafkaClusterId);
                    clusterReaderMap.put(kafkaClusterId, kafkaSourceReader);
                    kafkaSourceReader.start();
                    newCluster = true;
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }

            // add splits
            KafkaSourceReader<T> reader = clusterReaderMap.get(kafkaClusterId);
            reader.addSplits(clusterSplitsMap.get(kafkaClusterId));
        }

        // reset the availability future to also depend on the new sub readers
        if (newCluster) {
            completeAndResetAvailabilityHelper();
        }
    }