in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java [166:218]
public void handleSplitsChanges(SplitsChange<KafkaPartitionSplit> splitsChange) {
// Get all the partition assignments and stopping offsets.
if (!(splitsChange instanceof SplitsAddition)) {
throw new UnsupportedOperationException(
String.format(
"The SplitChange type of %s is not supported.",
splitsChange.getClass()));
}
// Assignment.
List<TopicPartition> newPartitionAssignments = new ArrayList<>();
// Starting offsets.
Map<TopicPartition, Long> partitionsStartingFromSpecifiedOffsets = new HashMap<>();
List<TopicPartition> partitionsStartingFromEarliest = new ArrayList<>();
List<TopicPartition> partitionsStartingFromLatest = new ArrayList<>();
// Stopping offsets.
List<TopicPartition> partitionsStoppingAtLatest = new ArrayList<>();
Set<TopicPartition> partitionsStoppingAtCommitted = new HashSet<>();
// Parse the starting and stopping offsets.
splitsChange
.splits()
.forEach(
s -> {
newPartitionAssignments.add(s.getTopicPartition());
parseStartingOffsets(
s,
partitionsStartingFromEarliest,
partitionsStartingFromLatest,
partitionsStartingFromSpecifiedOffsets);
parseStoppingOffsets(
s, partitionsStoppingAtLatest, partitionsStoppingAtCommitted);
// Track the new topic partition in metrics
kafkaSourceReaderMetrics.registerTopicPartition(s.getTopicPartition());
});
// Assign new partitions.
newPartitionAssignments.addAll(consumer.assignment());
consumer.assign(newPartitionAssignments);
// Seek on the newly assigned partitions to their stating offsets.
seekToStartingOffsets(
partitionsStartingFromEarliest,
partitionsStartingFromLatest,
partitionsStartingFromSpecifiedOffsets);
// Setup the stopping offsets.
acquireAndSetStoppingOffsets(partitionsStoppingAtLatest, partitionsStoppingAtCommitted);
// After acquiring the starting and stopping offsets, remove the empty splits if necessary.
removeEmptySplits();
maybeLogSplitChangesHandlingResult(splitsChange);
}