in mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqOffsetStoreManager.java [65:97]
public void save(String clientId, Map<Subscription, Map<Queue, QueueOffset>> offsetMap) {
if (offsetMap == null || offsetMap.isEmpty()) {
return;
}
for (Map.Entry<Subscription, Map<Queue, QueueOffset>> entry : offsetMap.entrySet()) {
Map<String, String> tmpBrokerAddressMap = findBrokers(entry.getKey());
if (tmpBrokerAddressMap == null || tmpBrokerAddressMap.isEmpty()) {
continue;
}
for (Map.Entry<Queue, QueueOffset> each : entry.getValue().entrySet()) {
try {
Queue queue = each.getKey();
if (StringUtils.isBlank(queue.getBrokerName())) {
continue;
}
String brokerAddress = tmpBrokerAddressMap.get(queue.getBrokerName());
QueueOffset queueOffset = each.getValue();
UpdateConsumerOffsetRequestHeader updateHeader = new UpdateConsumerOffsetRequestHeader();
updateHeader.setTopic(MixAll.LMQ_PREFIX + StringUtils.replace(queue.getQueueName(), "/","%"));
updateHeader.setConsumerGroup(MixAll.LMQ_PREFIX + clientId);
updateHeader.setQueueId((int) queue.getQueueId());
updateHeader.setCommitOffset(queueOffset.getOffset());
defaultMQPullConsumer
.getDefaultMQPullConsumerImpl()
.getRebalanceImpl()
.getmQClientFactory()
.getMQClientAPIImpl().updateConsumerOffset(brokerAddress, updateHeader, 1000);
} catch (Exception e) {
logger.error("", e);
}
}
}
}