private Page queryMessageByTaskPage()

in src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java [402:468]


    private Page<MessageView> queryMessageByTaskPage(MessageQueryByPage query, List<QueueOffsetInfo> queueOffsetInfos) {
        boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey());
        RPCHook rpcHook = null;
        if (isEnableAcl) {
            rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
        }
        DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook, configure.isUseTLS());
        List<MessageView> messageViews = new ArrayList<>();

        long offset = query.getPageNum() * query.getPageSize();

        long total = 0;
        try {
            consumer.start();
            for (QueueOffsetInfo queueOffsetInfo : queueOffsetInfos) {
                long start = queueOffsetInfo.getStart();
                long end = queueOffsetInfo.getEnd();
                queueOffsetInfo.setStartOffset(start);
                queueOffsetInfo.setEndOffset(start);
                total += end - start;
            }
            if (total <= offset) {
                return Page.empty();
            }
            long pageSize = total - offset > query.getPageSize() ? query.getPageSize() : total - offset;

            int next = moveStartOffset(queueOffsetInfos, query);
            moveEndOffset(queueOffsetInfos, query, next);

            for (QueueOffsetInfo queueOffsetInfo : queueOffsetInfos) {
                Long start = queueOffsetInfo.getStartOffset();
                Long end = queueOffsetInfo.getEndOffset();
                long size = Math.min(end - start, pageSize);
                if (size == 0) {
                    continue;
                }

                while (size > 0) {
                    PullResult pullResult = consumer.pull(queueOffsetInfo.getMessageQueues(), "*", start, 32);
                    if (pullResult.getPullStatus() == PullStatus.FOUND) {
                        List<MessageExt> poll = pullResult.getMsgFoundList();
                        if (poll.size() == 0) {
                            break;
                        }
                        List<MessageView> collect = poll.stream()
                                .map(MessageView::fromMessageExt).collect(Collectors.toList());

                        for (MessageView view : collect) {
                            if (size > 0) {
                                messageViews.add(view);
                                size--;
                            }
                        }
                    } else {
                        break;
                    }

                }
            }
            return new PageImpl<>(messageViews, query.page(), total);
        } catch (Exception e) {
            Throwables.throwIfUnchecked(e);
            throw new RuntimeException(e);
        } finally {
            consumer.shutdown();
        }
    }