in src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java [260:400]
private MessagePageTask queryFirstMessagePage(MessageQueryByPage query) {
boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey());
RPCHook rpcHook = null;
if (isEnableAcl) {
rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
}
DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook, configure.isUseTLS());
long total = 0;
List<QueueOffsetInfo> queueOffsetInfos = new ArrayList<>();
List<MessageView> messageViews = new ArrayList<>();
try {
consumer.start();
Collection<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues(query.getTopic());
int idx = 0;
for (MessageQueue messageQueue : messageQueues) {
Long minOffset = consumer.searchOffset(messageQueue, query.getBegin());
Long maxOffset = consumer.searchOffset(messageQueue, query.getEnd()) + 1;
queueOffsetInfos.add(new QueueOffsetInfo(idx++, minOffset, maxOffset, minOffset, minOffset, messageQueue));
}
// check first offset has message
// filter the begin time
for (QueueOffsetInfo queueOffset : queueOffsetInfos) {
Long start = queueOffset.getStart();
boolean hasData = false;
boolean hasIllegalOffset = true;
while (hasIllegalOffset) {
PullResult pullResult = consumer.pull(queueOffset.getMessageQueues(), "*", start, 32);
if (pullResult.getPullStatus() == PullStatus.FOUND) {
hasData = true;
List<MessageExt> msgFoundList = pullResult.getMsgFoundList();
for (MessageExt messageExt : msgFoundList) {
if (messageExt.getStoreTimestamp() < query.getBegin()) {
start++;
} else {
hasIllegalOffset = false;
break;
}
}
} else {
hasIllegalOffset = false;
}
}
if (!hasData) {
queueOffset.setEnd(queueOffset.getStart());
}
queueOffset.setStart(start);
queueOffset.setStartOffset(start);
queueOffset.setEndOffset(start);
}
// filter the end time
for (QueueOffsetInfo queueOffset : queueOffsetInfos) {
if (queueOffset.getStart().equals(queueOffset.getEnd())) {
continue;
}
long end = queueOffset.getEnd();
long pullOffset = end;
int pullSize = 32;
boolean hasIllegalOffset = true;
while (hasIllegalOffset) {
if (pullOffset - pullSize > queueOffset.getStart()) {
pullOffset = pullOffset - pullSize;
} else {
pullOffset = queueOffset.getStartOffset();
pullSize = (int) (end - pullOffset);
}
PullResult pullResult = consumer.pull(queueOffset.getMessageQueues(), "*", pullOffset, pullSize);
if (pullResult.getPullStatus() == PullStatus.FOUND) {
List<MessageExt> msgFoundList = pullResult.getMsgFoundList();
for (int i = msgFoundList.size() - 1; i >= 0; i--) {
MessageExt messageExt = msgFoundList.get(i);
if (messageExt.getStoreTimestamp() > query.getEnd()) {
end--;
} else {
hasIllegalOffset = false;
break;
}
}
} else {
hasIllegalOffset = false;
}
if (pullOffset == queueOffset.getStartOffset()) {
break;
}
}
queueOffset.setEnd(end);
total += queueOffset.getEnd() - queueOffset.getStart();
}
long pageSize = total > query.getPageSize() ? query.getPageSize() : total;
// move startOffset
int next = moveStartOffset(queueOffsetInfos, query);
moveEndOffset(queueOffsetInfos, query, next);
// find the first page of message
for (QueueOffsetInfo queueOffsetInfo : queueOffsetInfos) {
Long start = queueOffsetInfo.getStartOffset();
Long end = queueOffsetInfo.getEndOffset();
long size = Math.min(end - start, pageSize);
if (size == 0) {
continue;
}
while (size > 0) {
PullResult pullResult = consumer.pull(queueOffsetInfo.getMessageQueues(), "*", start, 32);
if (pullResult.getPullStatus() == PullStatus.FOUND) {
List<MessageExt> poll = pullResult.getMsgFoundList();
if (poll.size() == 0) {
break;
}
List<MessageView> collect = poll.stream()
.map(MessageView::fromMessageExt).collect(Collectors.toList());
for (MessageView view : collect) {
if (size > 0) {
messageViews.add(view);
size--;
}
}
} else {
break;
}
}
}
PageImpl<MessageView> page = new PageImpl<>(messageViews, query.page(), total);
return new MessagePageTask(page, queueOffsetInfos);
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} finally {
consumer.shutdown();
}
}