in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReader.java [230:251]
private void cumulativeAcknowledgmentMessage() {
Map<TopicPartition, MessageId> cursors = new HashMap<>(cursorsOfFinishedSplits);
// We reuse snapshotState for acquiring a consume status snapshot.
// So the checkpoint didn't really happen, so we just pass a fake checkpoint id.
List<PulsarPartitionSplit> splits = super.snapshotState(1L);
for (PulsarPartitionSplit split : splits) {
MessageId latestConsumedId = split.getLatestConsumedId();
if (latestConsumedId != null) {
cursors.put(split.getPartition(), latestConsumedId);
}
}
try {
((PulsarSourceFetcherManager) splitFetcherManager).acknowledgeMessages(cursors);
// Clean up the finish splits.
cursorsOfFinishedSplits.keySet().removeAll(cursors.keySet());
} catch (Exception e) {
LOG.error("Fail in auto cursor commit.", e);
cursorCommitThrowable.compareAndSet(null, e);
}
}