in client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java [323:488]
public QueryResult queryMessage(String clusterName, String topic, String key, int maxNum, long begin, long end,
boolean isUniqKey) throws MQClientException,
InterruptedException {
boolean isLmq = MixAll.isLmq(topic);
String routeTopic = topic;
// if topic is lmq ,then use clusterName as lmq parent topic
// Use clusterName or lmq parent topic to get topic route for lmq or rmq_sys_wheel_timer
if (!StringUtils.isEmpty(topic) && (isLmq || topic.equals(TopicValidator.SYSTEM_TOPIC_PREFIX + "wheel_timer"))
&& !StringUtils.isEmpty(clusterName)) {
routeTopic = clusterName;
}
TopicRouteData topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(routeTopic);
if (null == topicRouteData) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(routeTopic);
topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(routeTopic);
}
if (topicRouteData != null) {
List<String> brokerAddrs = new LinkedList<>();
for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
if (!isLmq && clusterName != null && !clusterName.isEmpty()
&& !clusterName.equals(brokerData.getCluster())) {
continue;
}
String addr = brokerData.selectBrokerAddr();
if (addr != null) {
brokerAddrs.add(addr);
}
}
if (!brokerAddrs.isEmpty()) {
final CountDownLatch countDownLatch = new CountDownLatch(brokerAddrs.size());
final List<QueryResult> queryResultList = new LinkedList<>();
final ReadWriteLock lock = new ReentrantReadWriteLock(false);
for (String addr : brokerAddrs) {
try {
QueryMessageRequestHeader requestHeader = new QueryMessageRequestHeader();
if (isLmq) {
requestHeader.setTopic(clusterName);
} else {
requestHeader.setTopic(topic);
}
requestHeader.setKey(key);
requestHeader.setMaxNum(maxNum);
requestHeader.setBeginTimestamp(begin);
requestHeader.setEndTimestamp(end);
this.mQClientFactory.getMQClientAPIImpl().queryMessage(addr, requestHeader, timeoutMillis * 3,
new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
}
@Override
public void operationSucceed(RemotingCommand response) {
try {
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
QueryMessageResponseHeader responseHeader = null;
try {
responseHeader =
(QueryMessageResponseHeader) response
.decodeCommandCustomHeader(QueryMessageResponseHeader.class);
} catch (RemotingCommandException e) {
log.error("decodeCommandCustomHeader exception", e);
return;
}
List<MessageExt> wrappers =
MessageDecoder.decodes(ByteBuffer.wrap(response.getBody()), true);
QueryResult qr = new QueryResult(responseHeader.getIndexLastUpdateTimestamp(), wrappers);
try {
lock.writeLock().lock();
queryResultList.add(qr);
} finally {
lock.writeLock().unlock();
}
break;
}
default:
log.warn("getResponseCommand failed, {} {}", response.getCode(), response.getRemark());
break;
}
} finally {
countDownLatch.countDown();
}
}
@Override
public void operationFail(Throwable throwable) {
log.error("queryMessage error, requestHeader={}", requestHeader);
countDownLatch.countDown();
}
}, isUniqKey);
} catch (Exception e) {
log.warn("queryMessage exception", e);
}
}
boolean ok = countDownLatch.await(timeoutMillis * 4, TimeUnit.MILLISECONDS);
if (!ok) {
log.warn("queryMessage, maybe some broker failed");
}
long indexLastUpdateTimestamp = 0;
List<MessageExt> messageList = new LinkedList<>();
for (QueryResult qr : queryResultList) {
if (qr.getIndexLastUpdateTimestamp() > indexLastUpdateTimestamp) {
indexLastUpdateTimestamp = qr.getIndexLastUpdateTimestamp();
}
for (MessageExt msgExt : qr.getMessageList()) {
if (isUniqKey) {
if (msgExt.getMsgId().equals(key)) {
messageList.add(msgExt);
} else {
log.warn("queryMessage by uniqKey, find message key not matched, maybe hash duplicate {}", msgExt.toString());
}
} else {
String keys = msgExt.getKeys();
String msgTopic = msgExt.getTopic();
if (keys != null) {
boolean matched = false;
String[] keyArray = keys.split(MessageConst.KEY_SEPARATOR);
for (String k : keyArray) {
// both topic and key must be equal at the same time
if (Objects.equals(key, k) && (isLmq || Objects.equals(topic, msgTopic))) {
matched = true;
break;
}
}
if (matched) {
messageList.add(msgExt);
} else {
log.warn("queryMessage, find message key not matched, maybe hash duplicate {}", msgExt.toString());
}
}
}
}
}
//If namespace not null , reset Topic without namespace.
if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
for (MessageExt messageExt : messageList) {
messageExt.setTopic(NamespaceUtil.withoutNamespace(messageExt.getTopic(), this.mQClientFactory.getClientConfig().getNamespace()));
}
}
if (!messageList.isEmpty()) {
return new QueryResult(indexLastUpdateTimestamp, messageList);
} else {
throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by key finished, but no message.");
}
}
}
throw new MQClientException(ResponseCode.TOPIC_NOT_EXIST, "The topic[" + topic + "] not matched route info");
}