in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReader.java [177:194]
public List<PulsarPartitionSplit> snapshotState(long checkpointId) {
List<PulsarPartitionSplit> splits = super.snapshotState(checkpointId);
// Perform a snapshot for these splits.
Map<TopicPartition, MessageId> cursors =
cursorsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>());
// Put the cursors of the active splits.
for (PulsarPartitionSplit split : splits) {
MessageId latestConsumedId = split.getLatestConsumedId();
if (latestConsumedId != null) {
cursors.put(split.getPartition(), latestConsumedId);
}
}
// Put cursors of all the finished splits.
cursors.putAll(cursorsOfFinishedSplits);
return splits;
}