in mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java [334:367]
private void pull(String lmqTopic, MessageQueue mq, long offset, int maxNums, PullCallback pullCallback)
throws MQClientException, RemotingException, InterruptedException {
try {
int sysFlag = PullSysFlag.buildSysFlag(false, false, true, false);
long timeoutMillis = 3000L;
pullKernelImpl(
lmqTopic,
mq,
"*",
"TAG",
0L,
offset,
maxNums,
sysFlag,
0,
5000L,
timeoutMillis,
CommunicationMode.ASYNC,
new PullCallback() {
@Override
public void onSuccess(org.apache.rocketmq.client.consumer.PullResult pullResult) {
org.apache.rocketmq.client.consumer.PullResult userPullResult = pullAPIWrapper.processPullResult(mq, pullResult, new SubscriptionData());
pullCallback.onSuccess(userPullResult);
}
@Override
public void onException(Throwable e) {
pullCallback.onException(e);
}
});
} catch (MQBrokerException e) {
throw new MQClientException("pullAsync unknow exception", e);
}
}