in src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java [564:615]
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// called when a snapshot for a checkpoint is requested
log.info("Snapshotting state {} ...", context.getCheckpointId());
if (!runningChecker.isRunning()) {
log.info("snapshotState() called on closed source; returning null.");
return;
}
Map<MessageQueue, Long> currentOffsets;
try {
// Discovers topic route change when snapshot
RetryUtil.call(
() -> {
Collection<MessageQueue> totalQueues = consumer.fetchMessageQueues(topic);
int taskNumber = getRuntimeContext().getNumberOfParallelSubtasks();
int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
List<MessageQueue> newQueues =
RocketMQUtils.allocate(totalQueues, taskNumber, taskIndex);
Collections.sort(newQueues);
log.debug(taskIndex + " Topic route is same.");
if (!messageQueues.equals(newQueues)) {
throw new RuntimeException();
}
return true;
},
"RuntimeException due to topic route changed");
unionOffsetStates.clear();
currentOffsets = new HashMap<>(offsetTable.size());
} catch (RuntimeException e) {
log.warn("Retry failed multiple times for topic route change, keep previous offset.");
// If the retry fails for multiple times, the message queue and its offset in the
// previous checkpoint will be retained.
List<Tuple2<MessageQueue, Long>> unionOffsets =
Lists.newArrayList(unionOffsetStates.get().iterator());
Map<MessageQueue, Long> queueOffsets = new HashMap<>(unionOffsets.size());
unionOffsets.forEach(queueOffset -> queueOffsets.put(queueOffset.f0, queueOffset.f1));
currentOffsets = new HashMap<>(unionOffsets.size() + offsetTable.size());
currentOffsets.putAll(queueOffsets);
}
for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));
currentOffsets.put(entry.getKey(), entry.getValue());
}
pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
log.info(
"Snapshot state, last processed offsets: {}, checkpoint id: {}, timestamp: {}",
offsetTable,
context.getCheckpointId(),
context.getCheckpointTimestamp());
}