in mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/RetainedMsgStateProcessor.java [52:105]
public Response onReadRequest(ReadRequest request) {
try {
MqttStateMachine sm = server.getMqttStateMachine(request.getGroup());
if (sm == null) {
logger.error("Fail to process RetainedMsg ReadRequest , Not Found SM for {}", request.getGroup());
return null;
}
String topic = request.getExtDataMap().get("topic");
String firstTopic = request.getExtDataMap().get("firstTopic");
String operation = request.getOperation();
logger.info("FirstTopic:{} Topic:{} Operation:{}", firstTopic, topic, operation);
if (operation.equals("topic")) { //return retained msg
return get(sm.getRocksDBEngine(), topic.getBytes(StandardCharsets.UTF_8));
} else { //return retain msgs of matched Topic
String wrapTrieFirstTopic = wrapTrieFirstTopic(firstTopic);
if (!retainedMsgTopicTrie.containsKey(wrapTrieFirstTopic)) {
Trie<String, String> newTrie = new Trie<>();
byte[] value = getRdb(sm.getRocksDBEngine(), wrapTrieFirstTopic.getBytes(StandardCharsets.UTF_8));
if (value != null) {
newTrie = JSON.parseObject(new String(value, StandardCharsets.UTF_8), Trie.class);
}
retainedMsgTopicTrie.put(wrapTrieFirstTopic, newTrie);
return Response.newBuilder()
.setSuccess(true)
.setData(ByteString.copyFrom(JSON.toJSONBytes(new ArrayList<byte[]>())))
.build();
}
Trie<String, String> tmpTrie = retainedMsgTopicTrie.get(wrapTrieFirstTopic);
Set<String> matchTopics = tmpTrie.getAllPath(topic);
ArrayList<ByteString> msgResults = new ArrayList<>();
for (String tmpTopic : matchTopics) {
byte[] value = getRdb(sm.getRocksDBEngine(), tmpTopic.getBytes(StandardCharsets.UTF_8));
if (value != null) {
msgResults.add(ByteString.copyFrom(value));
}
}
return Response.newBuilder()
.setSuccess(true)
.addAllDatalist(msgResults)//return retained msgs of matched Topic
.build();
}
} catch (Exception e) {
logger.error("", e);
return Response.newBuilder()
.setSuccess(false)
.setErrMsg(e.getMessage())
.build();
}
}