public Response onWriteRequest()

in mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/RetainedMsgStateProcessor.java [142:174]


    public Response onWriteRequest(WriteRequest writeRequest) {
        try {
            MqttStateMachine sm = server.getMqttStateMachine(writeRequest.getGroup());
            if (sm == null) {
                logger.error("Fail to process RetainedMsg WriteRequest , Not Found SM for {}", writeRequest.getGroup());
                return null;
            }
            String firstTopic = TopicUtils.normalizeTopic(writeRequest.getExtDataMap().get("firstTopic"));     //retained msg firstTopic
            String topic = TopicUtils.normalizeTopic(writeRequest.getExtDataMap().get("topic"));     //retained msg topic
            boolean isEmpty = Boolean.parseBoolean(writeRequest.getExtDataMap().get("isEmpty"));     //retained msg is empty
            byte[] message = writeRequest.getData().toByteArray();
            boolean res = setRetainedMsg(sm.getRocksDBEngine(), firstTopic, topic, isEmpty, message);
            if (!res) {
                logger.warn("Put the topic {} retained message failed! Exceeded maximum number of reserved topics limit.", topic);
                return Response.newBuilder()
                        .setSuccess(false)
                        .setErrMsg("Exceeded maximum number of reserved topics limit.")
                        .build();
            }
            logger.info("Put the topic {} retained message success!", topic);

            return Response.newBuilder()
                    .setSuccess(true)
                    .setData(ByteString.copyFrom(JSON.toJSONBytes(topic)))
                    .build();
        } catch (Exception e) {
            logger.error("Put the retained message error!", e);
            return Response.newBuilder()
                    .setSuccess(false)
                    .setErrMsg(e.getMessage())
                    .build();
        }
    }