in mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java [126:159]
private Message toLmqMessage(Queue queue, MessageExt mqMessage) {
Message message = new Message();
message.setMsgId(mqMessage.getMsgId());
message.setOffset(parseLmqOffset(queue, mqMessage));
message.setEmpty(Boolean.parseBoolean(mqMessage.getUserProperty(Constants.PROPERTY_ORIGIN_MQTT_ISEMPTY_MSG)));
if (StringUtils.isNotBlank(mqMessage.getUserProperty(Constants.PROPERTY_ORIGIN_MQTT_TOPIC))) {
message.setOriginTopic(mqMessage.getUserProperty(Constants.PROPERTY_ORIGIN_MQTT_TOPIC));
} else if (StringUtils.isNotBlank(mqMessage.getUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))) {
// maybe from rmq
String s = mqMessage.getUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
String[] lmqSet = s.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
for (String lmq : lmqSet) {
if (TopicUtils.isWildCard(lmq)) {
continue;
}
String originQueue = lmq.replace(MixAll.LMQ_PREFIX, "");
message.setOriginTopic(StringUtils.replace(originQueue, "%","/"));
}
}
message.setFirstTopic(mqMessage.getTopic());
message.setPayload(mqMessage.getBody());
message.setStoreTimestamp(mqMessage.getStoreTimestamp());
message.setBornTimestamp(mqMessage.getBornTimestamp());
if (StringUtils.isNotBlank(mqMessage.getUserProperty(Constants.PROPERTY_MQTT_RETRY_TIMES))) {
message.setRetry(Integer.parseInt(mqMessage.getUserProperty(Constants.PROPERTY_MQTT_RETRY_TIMES)));
}
if (StringUtils.isNotBlank(mqMessage.getUserProperty(Constants.PROPERTY_MQTT_EXT_DATA))) {
message.getUserProperties().putAll(
JSONObject.parseObject(mqMessage.getUserProperty(Constants.PROPERTY_MQTT_EXT_DATA),
new TypeReference<Map<String, String>>() {
}));
}
return message;
}