public CompletableFuture putMessage()

in mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java [181:214]


    public CompletableFuture<StoreResult> putMessage(Set<String> queues, Message message) {
        CompletableFuture<StoreResult> result = new CompletableFuture<>();
        org.apache.rocketmq.common.message.Message mqMessage = toMQMessage(message);
        mqMessage.setTags(Constants.MQTT_TAG);
        mqMessage.putUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH,
                StringUtils.join(
                        queues.stream().map(s -> MixAll.LMQ_PREFIX + StringUtils.replace(s, "/", "%")).collect(Collectors.toSet()),
                        MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));
        try {
            long start = System.currentTimeMillis();
            defaultMQProducer.send(mqMessage,
                    new SendCallback() {
                        @Override
                        public void onSuccess(SendResult sendResult) {
                            result.complete(toStoreResult(sendResult));
                            long rt = System.currentTimeMillis() - start;
                            StatUtil.addInvoke("lmqWrite", rt);
                            collectLmqReadWriteMatchActionRt("lmqWrite", rt, true);
                        }

                        @Override
                        public void onException(Throwable e) {
                            logger.error("", e);
                            result.completeExceptionally(e);
                            long rt = System.currentTimeMillis() - start;
                            StatUtil.addInvoke("lmqWrite", rt, false);
                            collectLmqReadWriteMatchActionRt("lmqWrite", rt, false);
                        }
                    });
        } catch (Throwable e) {
            result.completeExceptionally(e);
        }
        return result;
    }