public CompletableFuture pullMessage()

in mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java [225:259]


    public CompletableFuture<PullResult> pullMessage(String firstTopic, Queue queue, QueueOffset queueOffset, long count) {
        CompletableFuture<PullResult> result = new CompletableFuture<>();
        try {
            MessageQueue messageQueue = new MessageQueue(firstTopic, queue.getBrokerName(), (int) queue.getQueueId());
            long start = System.currentTimeMillis();
            String lmqTopic = MixAll.LMQ_PREFIX + StringUtils.replace(queue.getQueueName(), "/","%");
            pull(lmqTopic, messageQueue, queueOffset.getOffset(), (int) count, new PullCallback() {
                @Override
                public void onSuccess(org.apache.rocketmq.client.consumer.PullResult pullResult) {
                    result.complete(toLmqPullResult(queue, pullResult));
                    long rt = System.currentTimeMillis() - start;
                    StatUtil.addInvoke("lmqPull", rt);
                    collectLmqReadWriteMatchActionRt("lmqPull", rt, true);
                    StatUtil.addPv(pullResult.getPullStatus().name(), 1);
                    try {
                        MqttMetricsCollector.collectPullStatusTps(1, pullResult.getPullStatus().name());
                    } catch (Throwable e) {
                        logger.error("collect prometheus error", e);
                    }
                }

                @Override
                public void onException(Throwable e) {
                    logger.error("", e);
                    result.completeExceptionally(e);
                    long rt = System.currentTimeMillis() - start;
                    StatUtil.addInvoke("lmqPull", rt, false);
                    collectLmqReadWriteMatchActionRt("lmqPull", rt, false);
                }
            });
        } catch (Throwable e) {
            result.completeExceptionally(e);
        }
        return result;
    }