public void snapshotState()

in src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java [551:602]


    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        // called when a snapshot for a checkpoint is requested
        log.info("Snapshotting state {} ...", context.getCheckpointId());
        if (!runningChecker.isRunning()) {
            log.info("snapshotState() called on closed source; returning null.");
            return;
        }

        Map<MessageQueue, Long> currentOffsets;
        try {
            // Discovers topic route change when snapshot
            RetryUtil.call(
                    () -> {
                        Collection<MessageQueue> totalQueues = consumer.fetchMessageQueues(topic);
                        int taskNumber = getRuntimeContext().getNumberOfParallelSubtasks();
                        int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
                        List<MessageQueue> newQueues =
                                RocketMQUtils.allocate(totalQueues, taskNumber, taskIndex);
                        Collections.sort(newQueues);
                        log.debug(taskIndex + " Topic route is same.");
                        if (!messageQueues.equals(newQueues)) {
                            throw new RuntimeException();
                        }
                        return true;
                    },
                    "RuntimeException due to topic route changed");

            unionOffsetStates.clear();
            currentOffsets = new HashMap<>(offsetTable.size());
        } catch (RuntimeException e) {
            log.warn("Retry failed multiple times for topic route change, keep previous offset.");
            // If the retry fails for multiple times, the message queue and its offset in the
            // previous checkpoint will be retained.
            List<Tuple2<MessageQueue, Long>> unionOffsets =
                    Lists.newArrayList(unionOffsetStates.get().iterator());
            Map<MessageQueue, Long> queueOffsets = new HashMap<>(unionOffsets.size());
            unionOffsets.forEach(queueOffset -> queueOffsets.put(queueOffset.f0, queueOffset.f1));
            currentOffsets = new HashMap<>(unionOffsets.size() + offsetTable.size());
            currentOffsets.putAll(queueOffsets);
        }

        for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
            unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));
            currentOffsets.put(entry.getKey(), entry.getValue());
        }
        pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
        log.info(
                "Snapshot state, last processed offsets: {}, checkpoint id: {}, timestamp: {}",
                offsetTable,
                context.getCheckpointId(),
                context.getCheckpointTimestamp());
    }