in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java [98:123]
public List<KafkaPartitionSplit> snapshotState(long checkpointId) {
List<KafkaPartitionSplit> splits = super.snapshotState(checkpointId);
if (!commitOffsetsOnCheckpoint) {
return splits;
}
if (splits.isEmpty() && offsetsOfFinishedSplits.isEmpty()) {
offsetsToCommit.put(checkpointId, Collections.emptyMap());
} else {
Map<TopicPartition, OffsetAndMetadata> offsetsMap =
offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>());
// Put the offsets of the active splits.
for (KafkaPartitionSplit split : splits) {
// If the checkpoint is triggered before the partition starting offsets
// is retrieved, do not commit the offsets for those partitions.
if (split.getStartingOffset() >= 0) {
offsetsMap.put(
split.getTopicPartition(),
new OffsetAndMetadata(split.getStartingOffset()));
}
}
// Put offsets of all the finished splits.
offsetsMap.putAll(offsetsOfFinishedSplits);
}
return splits;
}