public void handleSplitsChanges()

in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java [175:227]


    public void handleSplitsChanges(SplitsChange<KafkaPartitionSplit> splitsChange) {
        // Get all the partition assignments and stopping offsets.
        if (!(splitsChange instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(
                    String.format(
                            "The SplitChange type of %s is not supported.",
                            splitsChange.getClass()));
        }

        // Assignment.
        List<TopicPartition> newPartitionAssignments = new ArrayList<>();
        // Starting offsets.
        Map<TopicPartition, Long> partitionsStartingFromSpecifiedOffsets = new HashMap<>();
        List<TopicPartition> partitionsStartingFromEarliest = new ArrayList<>();
        List<TopicPartition> partitionsStartingFromLatest = new ArrayList<>();
        // Stopping offsets.
        List<TopicPartition> partitionsStoppingAtLatest = new ArrayList<>();
        Set<TopicPartition> partitionsStoppingAtCommitted = new HashSet<>();

        // Parse the starting and stopping offsets.
        splitsChange
                .splits()
                .forEach(
                        s -> {
                            newPartitionAssignments.add(s.getTopicPartition());
                            parseStartingOffsets(
                                    s,
                                    partitionsStartingFromEarliest,
                                    partitionsStartingFromLatest,
                                    partitionsStartingFromSpecifiedOffsets);
                            parseStoppingOffsets(
                                    s, partitionsStoppingAtLatest, partitionsStoppingAtCommitted);
                            // Track the new topic partition in metrics
                            kafkaSourceReaderMetrics.registerTopicPartition(s.getTopicPartition());
                        });

        // Assign new partitions.
        newPartitionAssignments.addAll(consumer.assignment());
        consumer.assign(newPartitionAssignments);

        // Seek on the newly assigned partitions to their stating offsets.
        seekToStartingOffsets(
                partitionsStartingFromEarliest,
                partitionsStartingFromLatest,
                partitionsStartingFromSpecifiedOffsets);
        // Setup the stopping offsets.
        acquireAndSetStoppingOffsets(partitionsStoppingAtLatest, partitionsStoppingAtCommitted);

        // After acquiring the starting and stopping offsets, remove the empty splits if necessary.
        removeEmptySplits();

        maybeLogSplitChangesHandlingResult(splitsChange);
    }