private MessagePageTask queryFirstMessagePage()

in src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java [260:400]


    private MessagePageTask queryFirstMessagePage(MessageQueryByPage query) {
        boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey());
        RPCHook rpcHook = null;
        if (isEnableAcl) {
            rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
        }
        DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook, configure.isUseTLS());

        long total = 0;
        List<QueueOffsetInfo> queueOffsetInfos = new ArrayList<>();

        List<MessageView> messageViews = new ArrayList<>();

        try {
            consumer.start();
            Collection<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues(query.getTopic());
            int idx = 0;
            for (MessageQueue messageQueue : messageQueues) {
                Long minOffset = consumer.searchOffset(messageQueue, query.getBegin());
                Long maxOffset = consumer.searchOffset(messageQueue, query.getEnd());
                queueOffsetInfos.add(new QueueOffsetInfo(idx++, minOffset, maxOffset, minOffset, minOffset, messageQueue));
            }

            // check first offset has message
            // filter the begin time
            for (QueueOffsetInfo queueOffset : queueOffsetInfos) {
                Long start = queueOffset.getStart();
                boolean hasData = false;
                boolean hasIllegalOffset = true;
                while (hasIllegalOffset) {
                    PullResult pullResult = consumer.pull(queueOffset.getMessageQueues(), "*", start, 32);
                    if (pullResult.getPullStatus() == PullStatus.FOUND) {
                        hasData = true;
                        List<MessageExt> msgFoundList = pullResult.getMsgFoundList();
                        for (MessageExt messageExt : msgFoundList) {
                            if (messageExt.getStoreTimestamp() < query.getBegin()) {
                                start++;
                            } else {
                                hasIllegalOffset = false;
                                break;
                            }
                        }
                    } else {
                        hasIllegalOffset = false;
                    }
                }
                if (!hasData) {
                    queueOffset.setEnd(queueOffset.getStart());
                }
                queueOffset.setStart(start);
                queueOffset.setStartOffset(start);
                queueOffset.setEndOffset(start);
            }

            // filter the end time
            for (QueueOffsetInfo queueOffset : queueOffsetInfos) {
                if (queueOffset.getStart().equals(queueOffset.getEnd())) {
                    continue;
                }
                long end = queueOffset.getEnd();
                long pullOffset = end;
                int pullSize = 32;
                boolean hasIllegalOffset = true;
                while (hasIllegalOffset) {

                    if (pullOffset - pullSize > queueOffset.getStart()) {
                        pullOffset = pullOffset - pullSize;
                    } else {
                        pullOffset = queueOffset.getStartOffset();
                        pullSize = (int) (end - pullOffset);
                    }
                    PullResult pullResult = consumer.pull(queueOffset.getMessageQueues(), "*", pullOffset, pullSize);
                    if (pullResult.getPullStatus() == PullStatus.FOUND) {
                        List<MessageExt> msgFoundList = pullResult.getMsgFoundList();
                        for (int i = msgFoundList.size() - 1; i >= 0; i--) {
                            MessageExt messageExt = msgFoundList.get(i);
                            if (messageExt.getStoreTimestamp() > query.getEnd()) {
                                end--;
                            } else {
                                hasIllegalOffset = false;
                                break;
                            }
                        }
                    } else {
                        hasIllegalOffset = false;
                    }
                    if (pullOffset == queueOffset.getStartOffset()) {
                        break;
                    }
                }
                queueOffset.setEnd(end);
                total += queueOffset.getEnd() - queueOffset.getStart();
            }

            long pageSize = total > query.getPageSize() ? query.getPageSize() : total;


            // move startOffset
            int next = moveStartOffset(queueOffsetInfos, query);
            moveEndOffset(queueOffsetInfos, query, next);

            // find the first page of message
            for (QueueOffsetInfo queueOffsetInfo : queueOffsetInfos) {
                Long start = queueOffsetInfo.getStartOffset();
                Long end = queueOffsetInfo.getEndOffset();
                long size = Math.min(end - start, pageSize);
                if (size == 0) {
                    continue;
                }

                while (size > 0) {
                    PullResult pullResult = consumer.pull(queueOffsetInfo.getMessageQueues(), "*", start, 32);
                    if (pullResult.getPullStatus() == PullStatus.FOUND) {
                        List<MessageExt> poll = pullResult.getMsgFoundList();
                        if (poll.size() == 0) {
                            break;
                        }
                        List<MessageView> collect = poll.stream()
                                .map(MessageView::fromMessageExt).collect(Collectors.toList());

                        for (MessageView view : collect) {
                            if (size > 0) {
                                messageViews.add(view);
                                size--;
                            }
                        }
                    } else {
                        break;
                    }

                }
            }
            PageImpl<MessageView> page = new PageImpl<>(messageViews, query.page(), total);
            return new MessagePageTask(page, queueOffsetInfos);
        } catch (Exception e) {
            Throwables.throwIfUnchecked(e);
            throw new RuntimeException(e);
        } finally {
            consumer.shutdown();
        }
    }