in src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceReader.java [105:128]
public List<RocketMQSourceSplit> snapshotState(long checkpointId) {
List<RocketMQSourceSplit> splits = super.snapshotState(checkpointId);
if (!commitOffsetsOnCheckpoint) {
return splits;
}
if (splits.isEmpty() && offsetsOfFinishedSplits.isEmpty()) {
offsetsToCommit.put(checkpointId, Collections.emptyMap());
} else {
Map<MessageQueue, Long> offsetsMap =
offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>());
// Put the offsets of the active splits.
for (RocketMQSourceSplit split : splits) {
// If the checkpoint is triggered before the queue min offsets
// is retrieved, do not commit the offsets for those partitions.
if (split.getStartingOffset() >= 0) {
offsetsMap.put(UtilAll.getMessageQueue(split), split.getStartingOffset());
}
}
// Put offsets of all the finished splits.
offsetsMap.putAll(offsetsOfFinishedSplits);
}
return splits;
}