public List snapshotState()

in src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceReader.java [89:112]


    public List<RocketMQSourceSplit> snapshotState(long checkpointId) {
        List<RocketMQSourceSplit> splits = super.snapshotState(checkpointId);
        if (!commitOffsetsOnCheckpoint) {
            return splits;
        }

        if (splits.isEmpty() && offsetsOfFinishedSplits.isEmpty()) {
            offsetsToCommit.put(checkpointId, Collections.emptyMap());
        } else {
            Map<MessageQueue, Long> offsetsMap =
                    offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>());
            // Put the offsets of the active splits.
            for (RocketMQSourceSplit split : splits) {
                // If the checkpoint is triggered before the queue min offsets
                // is retrieved, do not commit the offsets for those partitions.
                if (split.getStartingOffset() >= 0) {
                    offsetsMap.put(UtilAll.getMessageQueue(split), split.getStartingOffset());
                }
            }
            // Put offsets of all the finished splits.
            offsetsMap.putAll(offsetsOfFinishedSplits);
        }
        return splits;
    }