in src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java [402:468]
private Page<MessageView> queryMessageByTaskPage(MessageQueryByPage query, List<QueueOffsetInfo> queueOffsetInfos) {
boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey());
RPCHook rpcHook = null;
if (isEnableAcl) {
rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
}
DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook, configure.isUseTLS());
List<MessageView> messageViews = new ArrayList<>();
long offset = query.getPageNum() * query.getPageSize();
long total = 0;
try {
consumer.start();
for (QueueOffsetInfo queueOffsetInfo : queueOffsetInfos) {
long start = queueOffsetInfo.getStart();
long end = queueOffsetInfo.getEnd();
queueOffsetInfo.setStartOffset(start);
queueOffsetInfo.setEndOffset(start);
total += end - start;
}
if (total <= offset) {
return Page.empty();
}
long pageSize = total - offset > query.getPageSize() ? query.getPageSize() : total - offset;
int next = moveStartOffset(queueOffsetInfos, query);
moveEndOffset(queueOffsetInfos, query, next);
for (QueueOffsetInfo queueOffsetInfo : queueOffsetInfos) {
Long start = queueOffsetInfo.getStartOffset();
Long end = queueOffsetInfo.getEndOffset();
long size = Math.min(end - start, pageSize);
if (size == 0) {
continue;
}
while (size > 0) {
PullResult pullResult = consumer.pull(queueOffsetInfo.getMessageQueues(), "*", start, 32);
if (pullResult.getPullStatus() == PullStatus.FOUND) {
List<MessageExt> poll = pullResult.getMsgFoundList();
if (poll.size() == 0) {
break;
}
List<MessageView> collect = poll.stream()
.map(MessageView::fromMessageExt).collect(Collectors.toList());
for (MessageView view : collect) {
if (size > 0) {
messageViews.add(view);
size--;
}
}
} else {
break;
}
}
}
return new PageImpl<>(messageViews, query.page(), total);
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} finally {
consumer.shutdown();
}
}