public void GetRetainedMsgsFromTrie()

in mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/RetainedMsgClient.java [98:147]


    public void GetRetainedMsgsFromTrie(String firstTopic, String topic, CompletableFuture<ArrayList<Message>> future) throws RemotingException, InterruptedException {
        String groupId = whichGroup();
        HashMap<String, String> option = new HashMap<>();

        option.put("firstTopic", firstTopic);
        option.put("topic", topic);

        logger.debug("GetRetainedMsgsFromTrie option:" + option);

        final ReadRequest request = ReadRequest.newBuilder()
                .setGroup(groupId)
                .setOperation("trie")
                .setType(READ_INDEX_TYPE)
                .putAllExtData(option)
                .setCategory(CATEGORY_RETAINED_MSG)
                .build();

        metaRpcClient.getCliClientService().getRpcClient().invokeAsync(metaRpcClient.getLeader(groupId).getEndpoint(), request, new InvokeCallback() {
            @Override
            public void complete(Object result, Throwable err) {
                if (err == null) {
                    Response rsp = (Response) result;
                    if (!rsp.getSuccess()) {
                        logger.error("GetRetainedTopicTrie failed. {}", rsp.getErrMsg());
                        future.complete(null);
                        return;
                    }
                    List<ByteString> datalistList = rsp.getDatalistList();
                    ArrayList<Message> resultList = new ArrayList<>();
                    for (ByteString tmp : datalistList) {
                        try {
                            resultList.add(Message.copyFromStoreMessage(StoreMessage.parseFrom(tmp.toByteArray())));
                        } catch (InvalidProtocolBufferException e) {
                            future.complete(null);
                            throw new RuntimeException(e);
                        }
                    }
                    future.complete(resultList);
                } else {
                    logger.error("", err);
                    future.complete(null);
                }
            }

            @Override
            public Executor executor() {
                return null;
            }
        }, 5000);
    }