public QueryResult queryMessage()

in client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java [323:488]


    public QueryResult queryMessage(String clusterName, String topic, String key, int maxNum, long begin, long end,
        boolean isUniqKey) throws MQClientException,
        InterruptedException {
        boolean isLmq = MixAll.isLmq(topic);

        String routeTopic = topic;
        // if topic is lmq ,then use clusterName as lmq parent topic
        // Use clusterName or lmq parent topic to get topic route for lmq or rmq_sys_wheel_timer
        if (!StringUtils.isEmpty(topic) && (isLmq || topic.equals(TopicValidator.SYSTEM_TOPIC_PREFIX + "wheel_timer"))
            && !StringUtils.isEmpty(clusterName)) {
            routeTopic = clusterName;
        }

        TopicRouteData topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(routeTopic);
        if (null == topicRouteData) {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(routeTopic);
            topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(routeTopic);
        }

        if (topicRouteData != null) {
            List<String> brokerAddrs = new LinkedList<>();
            for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
                if (!isLmq && clusterName != null && !clusterName.isEmpty()
                    && !clusterName.equals(brokerData.getCluster())) {
                    continue;
                }
                String addr = brokerData.selectBrokerAddr();
                if (addr != null) {
                    brokerAddrs.add(addr);
                }
            }

            if (!brokerAddrs.isEmpty()) {
                final CountDownLatch countDownLatch = new CountDownLatch(brokerAddrs.size());
                final List<QueryResult> queryResultList = new LinkedList<>();
                final ReadWriteLock lock = new ReentrantReadWriteLock(false);

                for (String addr : brokerAddrs) {
                    try {
                        QueryMessageRequestHeader requestHeader = new QueryMessageRequestHeader();
                        if (isLmq) {
                            requestHeader.setTopic(clusterName);
                        } else {
                            requestHeader.setTopic(topic);
                        }
                        requestHeader.setKey(key);
                        requestHeader.setMaxNum(maxNum);
                        requestHeader.setBeginTimestamp(begin);
                        requestHeader.setEndTimestamp(end);

                        this.mQClientFactory.getMQClientAPIImpl().queryMessage(addr, requestHeader, timeoutMillis * 3,
                            new InvokeCallback() {
                                @Override
                                public void operationComplete(ResponseFuture responseFuture) {

                                }

                                @Override
                                public void operationSucceed(RemotingCommand response) {
                                    try {
                                        switch (response.getCode()) {
                                            case ResponseCode.SUCCESS: {
                                                QueryMessageResponseHeader responseHeader = null;
                                                try {
                                                    responseHeader =
                                                        (QueryMessageResponseHeader) response
                                                            .decodeCommandCustomHeader(QueryMessageResponseHeader.class);
                                                } catch (RemotingCommandException e) {
                                                    log.error("decodeCommandCustomHeader exception", e);
                                                    return;
                                                }

                                                List<MessageExt> wrappers =
                                                    MessageDecoder.decodes(ByteBuffer.wrap(response.getBody()), true);

                                                QueryResult qr = new QueryResult(responseHeader.getIndexLastUpdateTimestamp(), wrappers);
                                                try {
                                                    lock.writeLock().lock();
                                                    queryResultList.add(qr);
                                                } finally {
                                                    lock.writeLock().unlock();
                                                }
                                                break;
                                            }
                                            default:
                                                log.warn("getResponseCommand failed, {} {}", response.getCode(), response.getRemark());
                                                break;
                                        }

                                    } finally {
                                        countDownLatch.countDown();
                                    }
                                }

                                @Override
                                public void operationFail(Throwable throwable) {
                                    log.error("queryMessage error, requestHeader={}", requestHeader);
                                    countDownLatch.countDown();
                                }
                            }, isUniqKey);
                    } catch (Exception e) {
                        log.warn("queryMessage exception", e);
                    }

                }

                boolean ok = countDownLatch.await(timeoutMillis * 4, TimeUnit.MILLISECONDS);
                if (!ok) {
                    log.warn("queryMessage, maybe some broker failed");
                }

                long indexLastUpdateTimestamp = 0;
                List<MessageExt> messageList = new LinkedList<>();
                for (QueryResult qr : queryResultList) {
                    if (qr.getIndexLastUpdateTimestamp() > indexLastUpdateTimestamp) {
                        indexLastUpdateTimestamp = qr.getIndexLastUpdateTimestamp();
                    }

                    for (MessageExt msgExt : qr.getMessageList()) {
                        if (isUniqKey) {
                            if (msgExt.getMsgId().equals(key)) {
                                messageList.add(msgExt);
                            } else {
                                log.warn("queryMessage by uniqKey, find message key not matched, maybe hash duplicate {}", msgExt.toString());
                            }
                        } else {
                            String keys = msgExt.getKeys();
                            String msgTopic = msgExt.getTopic();
                            if (keys != null) {
                                boolean matched = false;
                                String[] keyArray = keys.split(MessageConst.KEY_SEPARATOR);
                                for (String k : keyArray) {
                                    // both topic and key must be equal at the same time
                                    if (Objects.equals(key, k) && (isLmq || Objects.equals(topic, msgTopic))) {
                                        matched = true;
                                        break;
                                    }
                                }

                                if (matched) {
                                    messageList.add(msgExt);
                                } else {
                                    log.warn("queryMessage, find message key not matched, maybe hash duplicate {}", msgExt.toString());
                                }
                            }
                        }
                    }
                }

                //If namespace not null , reset Topic without namespace.
                if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
                    for (MessageExt messageExt : messageList) {
                        messageExt.setTopic(NamespaceUtil.withoutNamespace(messageExt.getTopic(), this.mQClientFactory.getClientConfig().getNamespace()));
                    }
                }

                if (!messageList.isEmpty()) {
                    return new QueryResult(indexLastUpdateTimestamp, messageList);
                } else {
                    throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by key finished, but no message.");
                }
            }
        }

        throw new MQClientException(ResponseCode.TOPIC_NOT_EXIST, "The topic[" + topic + "] not matched route info");
    }