in mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/RetainedMsgStateProcessor.java [142:174]
public Response onWriteRequest(WriteRequest writeRequest) {
try {
MqttStateMachine sm = server.getMqttStateMachine(writeRequest.getGroup());
if (sm == null) {
logger.error("Fail to process RetainedMsg WriteRequest , Not Found SM for {}", writeRequest.getGroup());
return null;
}
String firstTopic = TopicUtils.normalizeTopic(writeRequest.getExtDataMap().get("firstTopic")); //retained msg firstTopic
String topic = TopicUtils.normalizeTopic(writeRequest.getExtDataMap().get("topic")); //retained msg topic
boolean isEmpty = Boolean.parseBoolean(writeRequest.getExtDataMap().get("isEmpty")); //retained msg is empty
byte[] message = writeRequest.getData().toByteArray();
boolean res = setRetainedMsg(sm.getRocksDBEngine(), firstTopic, topic, isEmpty, message);
if (!res) {
logger.warn("Put the topic {} retained message failed! Exceeded maximum number of reserved topics limit.", topic);
return Response.newBuilder()
.setSuccess(false)
.setErrMsg("Exceeded maximum number of reserved topics limit.")
.build();
}
logger.info("Put the topic {} retained message success!", topic);
return Response.newBuilder()
.setSuccess(true)
.setData(ByteString.copyFrom(JSON.toJSONBytes(topic)))
.build();
} catch (Exception e) {
logger.error("Put the retained message error!", e);
return Response.newBuilder()
.setSuccess(false)
.setErrMsg(e.getMessage())
.build();
}
}