in src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java [647:677]
public void notifyCheckpointComplete(long checkpointId) throws Exception {
// callback when checkpoint complete
if (!runningChecker.isRunning()) {
log.info("notifyCheckpointComplete() called on closed source; returning null.");
return;
}
final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
if (posInMap == -1) {
log.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
return;
}
Map<MessageQueue, Long> offsets =
(Map<MessageQueue, Long>) pendingOffsetsToCommit.remove(posInMap);
// remove older checkpoints in map
for (int i = 0; i < posInMap; i++) {
pendingOffsetsToCommit.remove(0);
}
if (offsets == null || offsets.size() == 0) {
log.debug("Checkpoint state was empty.");
return;
}
for (Map.Entry<MessageQueue, Long> entry : offsets.entrySet()) {
consumer.getOffsetStore().updateOffset(entry.getKey(), entry.getValue(), false);
consumer.getOffsetStore().persist(consumer.queueWithNamespace(entry.getKey()));
}
}