in mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java [181:214]
public CompletableFuture<StoreResult> putMessage(Set<String> queues, Message message) {
CompletableFuture<StoreResult> result = new CompletableFuture<>();
org.apache.rocketmq.common.message.Message mqMessage = toMQMessage(message);
mqMessage.setTags(Constants.MQTT_TAG);
mqMessage.putUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH,
StringUtils.join(
queues.stream().map(s -> MixAll.LMQ_PREFIX + StringUtils.replace(s, "/", "%")).collect(Collectors.toSet()),
MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));
try {
long start = System.currentTimeMillis();
defaultMQProducer.send(mqMessage,
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
result.complete(toStoreResult(sendResult));
long rt = System.currentTimeMillis() - start;
StatUtil.addInvoke("lmqWrite", rt);
collectLmqReadWriteMatchActionRt("lmqWrite", rt, true);
}
@Override
public void onException(Throwable e) {
logger.error("", e);
result.completeExceptionally(e);
long rt = System.currentTimeMillis() - start;
StatUtil.addInvoke("lmqWrite", rt, false);
collectLmqReadWriteMatchActionRt("lmqWrite", rt, false);
}
});
} catch (Throwable e) {
result.completeExceptionally(e);
}
return result;
}