private Message toLmqMessage()

in mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java [126:159]


    private Message toLmqMessage(Queue queue, MessageExt mqMessage) {
        Message message = new Message();
        message.setMsgId(mqMessage.getMsgId());
        message.setOffset(parseLmqOffset(queue, mqMessage));
        message.setEmpty(Boolean.parseBoolean(mqMessage.getUserProperty(Constants.PROPERTY_ORIGIN_MQTT_ISEMPTY_MSG)));
        if (StringUtils.isNotBlank(mqMessage.getUserProperty(Constants.PROPERTY_ORIGIN_MQTT_TOPIC))) {
            message.setOriginTopic(mqMessage.getUserProperty(Constants.PROPERTY_ORIGIN_MQTT_TOPIC));
        } else if (StringUtils.isNotBlank(mqMessage.getUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))) {
            // maybe from rmq
            String s = mqMessage.getUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
            String[] lmqSet = s.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
            for (String lmq : lmqSet) {
                if (TopicUtils.isWildCard(lmq)) {
                    continue;
                }
                String originQueue = lmq.replace(MixAll.LMQ_PREFIX, "");
                message.setOriginTopic(StringUtils.replace(originQueue, "%","/"));
            }
        }
        message.setFirstTopic(mqMessage.getTopic());
        message.setPayload(mqMessage.getBody());
        message.setStoreTimestamp(mqMessage.getStoreTimestamp());
        message.setBornTimestamp(mqMessage.getBornTimestamp());
        if (StringUtils.isNotBlank(mqMessage.getUserProperty(Constants.PROPERTY_MQTT_RETRY_TIMES))) {
            message.setRetry(Integer.parseInt(mqMessage.getUserProperty(Constants.PROPERTY_MQTT_RETRY_TIMES)));
        }
        if (StringUtils.isNotBlank(mqMessage.getUserProperty(Constants.PROPERTY_MQTT_EXT_DATA))) {
            message.getUserProperties().putAll(
                    JSONObject.parseObject(mqMessage.getUserProperty(Constants.PROPERTY_MQTT_EXT_DATA),
                            new TypeReference<Map<String, String>>() {
                            }));
        }
        return message;
    }