in mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java [225:259]
public CompletableFuture<PullResult> pullMessage(String firstTopic, Queue queue, QueueOffset queueOffset, long count) {
CompletableFuture<PullResult> result = new CompletableFuture<>();
try {
MessageQueue messageQueue = new MessageQueue(firstTopic, queue.getBrokerName(), (int) queue.getQueueId());
long start = System.currentTimeMillis();
String lmqTopic = MixAll.LMQ_PREFIX + StringUtils.replace(queue.getQueueName(), "/","%");
pull(lmqTopic, messageQueue, queueOffset.getOffset(), (int) count, new PullCallback() {
@Override
public void onSuccess(org.apache.rocketmq.client.consumer.PullResult pullResult) {
result.complete(toLmqPullResult(queue, pullResult));
long rt = System.currentTimeMillis() - start;
StatUtil.addInvoke("lmqPull", rt);
collectLmqReadWriteMatchActionRt("lmqPull", rt, true);
StatUtil.addPv(pullResult.getPullStatus().name(), 1);
try {
MqttMetricsCollector.collectPullStatusTps(1, pullResult.getPullStatus().name());
} catch (Throwable e) {
logger.error("collect prometheus error", e);
}
}
@Override
public void onException(Throwable e) {
logger.error("", e);
result.completeExceptionally(e);
long rt = System.currentTimeMillis() - start;
StatUtil.addInvoke("lmqPull", rt, false);
collectLmqReadWriteMatchActionRt("lmqPull", rt, false);
}
});
} catch (Throwable e) {
result.completeExceptionally(e);
}
return result;
}