in mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/RetainedMsgClient.java [98:147]
public void GetRetainedMsgsFromTrie(String firstTopic, String topic, CompletableFuture<ArrayList<Message>> future) throws RemotingException, InterruptedException {
String groupId = whichGroup();
HashMap<String, String> option = new HashMap<>();
option.put("firstTopic", firstTopic);
option.put("topic", topic);
logger.debug("GetRetainedMsgsFromTrie option:" + option);
final ReadRequest request = ReadRequest.newBuilder()
.setGroup(groupId)
.setOperation("trie")
.setType(READ_INDEX_TYPE)
.putAllExtData(option)
.setCategory(CATEGORY_RETAINED_MSG)
.build();
metaRpcClient.getCliClientService().getRpcClient().invokeAsync(metaRpcClient.getLeader(groupId).getEndpoint(), request, new InvokeCallback() {
@Override
public void complete(Object result, Throwable err) {
if (err == null) {
Response rsp = (Response) result;
if (!rsp.getSuccess()) {
logger.error("GetRetainedTopicTrie failed. {}", rsp.getErrMsg());
future.complete(null);
return;
}
List<ByteString> datalistList = rsp.getDatalistList();
ArrayList<Message> resultList = new ArrayList<>();
for (ByteString tmp : datalistList) {
try {
resultList.add(Message.copyFromStoreMessage(StoreMessage.parseFrom(tmp.toByteArray())));
} catch (InvalidProtocolBufferException e) {
future.complete(null);
throw new RuntimeException(e);
}
}
future.complete(resultList);
} else {
logger.error("", err);
future.complete(null);
}
}
@Override
public Executor executor() {
return null;
}
}, 5000);
}