public Response onReadRequest()

in mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/RetainedMsgStateProcessor.java [52:105]


    public Response onReadRequest(ReadRequest request) {
        try {
            MqttStateMachine sm = server.getMqttStateMachine(request.getGroup());
            if (sm == null) {
                logger.error("Fail to process RetainedMsg ReadRequest , Not Found SM for {}", request.getGroup());
                return null;
            }
            String topic = request.getExtDataMap().get("topic");
            String firstTopic = request.getExtDataMap().get("firstTopic");
            String operation = request.getOperation();

            logger.info("FirstTopic:{} Topic:{} Operation:{}", firstTopic, topic, operation);

            if (operation.equals("topic")) {    //return retained msg
                return get(sm.getRocksDBEngine(), topic.getBytes(StandardCharsets.UTF_8));
            } else { //return retain msgs of matched Topic
                String wrapTrieFirstTopic = wrapTrieFirstTopic(firstTopic);
                if (!retainedMsgTopicTrie.containsKey(wrapTrieFirstTopic)) {
                    Trie<String, String> newTrie = new Trie<>();
                    byte[] value = getRdb(sm.getRocksDBEngine(), wrapTrieFirstTopic.getBytes(StandardCharsets.UTF_8));
                    if (value != null) {
                        newTrie = JSON.parseObject(new String(value, StandardCharsets.UTF_8), Trie.class);
                    }
                    retainedMsgTopicTrie.put(wrapTrieFirstTopic, newTrie);

                    return Response.newBuilder()
                            .setSuccess(true)
                            .setData(ByteString.copyFrom(JSON.toJSONBytes(new ArrayList<byte[]>())))
                            .build();
                }
                Trie<String, String> tmpTrie = retainedMsgTopicTrie.get(wrapTrieFirstTopic);
                Set<String> matchTopics = tmpTrie.getAllPath(topic);

                ArrayList<ByteString> msgResults = new ArrayList<>();

                for (String tmpTopic : matchTopics) {
                    byte[] value = getRdb(sm.getRocksDBEngine(), tmpTopic.getBytes(StandardCharsets.UTF_8));
                    if (value != null) {
                        msgResults.add(ByteString.copyFrom(value));
                    }
                }
                return Response.newBuilder()
                        .setSuccess(true)
                        .addAllDatalist(msgResults)//return retained msgs of matched Topic
                        .build();
            }
        } catch (Exception e) {
            logger.error("", e);
            return Response.newBuilder()
                    .setSuccess(false)
                    .setErrMsg(e.getMessage())
                    .build();
        }
    }