public void save()

in mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqOffsetStoreManager.java [65:97]


    public void save(String clientId, Map<Subscription, Map<Queue, QueueOffset>> offsetMap) {
        if (offsetMap == null || offsetMap.isEmpty()) {
            return;
        }
        for (Map.Entry<Subscription, Map<Queue, QueueOffset>> entry : offsetMap.entrySet()) {
            Map<String, String> tmpBrokerAddressMap = findBrokers(entry.getKey());
            if (tmpBrokerAddressMap == null || tmpBrokerAddressMap.isEmpty()) {
                continue;
            }
            for (Map.Entry<Queue, QueueOffset> each : entry.getValue().entrySet()) {
                try {
                    Queue queue = each.getKey();
                    if (StringUtils.isBlank(queue.getBrokerName())) {
                        continue;
                    }
                    String brokerAddress = tmpBrokerAddressMap.get(queue.getBrokerName());
                    QueueOffset queueOffset = each.getValue();
                    UpdateConsumerOffsetRequestHeader updateHeader = new UpdateConsumerOffsetRequestHeader();
                    updateHeader.setTopic(MixAll.LMQ_PREFIX + StringUtils.replace(queue.getQueueName(), "/","%"));
                    updateHeader.setConsumerGroup(MixAll.LMQ_PREFIX + clientId);
                    updateHeader.setQueueId((int) queue.getQueueId());
                    updateHeader.setCommitOffset(queueOffset.getOffset());
                    defaultMQPullConsumer
                            .getDefaultMQPullConsumerImpl()
                            .getRebalanceImpl()
                            .getmQClientFactory()
                            .getMQClientAPIImpl().updateConsumerOffset(brokerAddress, updateHeader, 1000);
                } catch (Exception e) {
                    logger.error("", e);
                }
            }
        }
    }