public void initOffsetTableFromRestoredOffsets()

in src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java [540:561]


    public void initOffsetTableFromRestoredOffsets(List<MessageQueue> messageQueues)
            throws MQClientException {
        Preconditions.checkNotNull(restoredOffsets, "restoredOffsets can't be null");
        restoredOffsets.forEach(
                (mq, offset) -> {
                    if (messageQueues.contains(mq)) {
                        offsetTable.put(mq, offset);
                    }
                });

        List<MessageQueue> extMessageQueue = new ArrayList<>();
        for (MessageQueue messageQueue : messageQueues) {
            if (!offsetTable.containsKey(messageQueue)) {
                extMessageQueue.add(messageQueue);
            }
        }
        if (extMessageQueue.size() != 0) {
            log.info("no restoredOffsets for {}, so init offset for these queues", extMessageQueue);
            initOffsets(extMessageQueue);
        }
        log.info("init offset table [{}] from restoredOffsets successful.", offsetTable);
    }