in mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java [62:104]
public CompletableFuture<HookResult> process(MqttMessageUpContext context, MqttMessage mqttMessage) {
MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
boolean isEmpty = false;
//deal empty payload
if (ByteBufUtil.getBytes(mqttPublishMessage.content()).length == 0) {
mqttPublishMessage = MessageUtil.dealEmptyMessage(mqttPublishMessage);
isEmpty = true;
}
String msgId = MessageClientIDSetter.createUniqID();
MqttPublishVariableHeader variableHeader = mqttPublishMessage.variableHeader();
String originTopic = variableHeader.topicName();
String pubTopic = TopicUtils.normalizeTopic(originTopic);
MqttTopic mqttTopic = TopicUtils.decode(pubTopic);
firstTopicManager.checkFirstTopicIfCreated(mqttTopic.getFirstTopic()); // Check the firstTopic is existed
Set<String> queueNames = wildcardManager.matchQueueSetByMsgTopic(pubTopic, context.getNamespace()); //According to topic to find queue
long bornTime = System.currentTimeMillis();
if (mqttPublishMessage.fixedHeader().isRetain()) {
MqttPublishMessage retainedMqttPublishMessage = mqttPublishMessage.copy();
//Change the retained flag of message that will send MQ is 0
mqttPublishMessage = MessageUtil.removeRetainedFlag(mqttPublishMessage);
//Keep the retained flag of message that will store meta
Message metaMessage = MessageUtil.toMessage(retainedMqttPublishMessage);
metaMessage.setMsgId(msgId);
metaMessage.setBornTimestamp(bornTime);
metaMessage.setEmpty(isEmpty);
CompletableFuture<Boolean> storeRetainedFuture = retainedPersistManager.storeRetainedMessage(TopicUtils.normalizeTopic(metaMessage.getOriginTopic()), metaMessage);
storeRetainedFuture.whenComplete((res, throwable) -> {
if (throwable != null) {
logger.error("Store topic:{} retained message error.{}", metaMessage.getOriginTopic(), throwable);
}
});
}
Message message = MessageUtil.toMessage(mqttPublishMessage);
message.setMsgId(msgId);
message.setBornTimestamp(bornTime);
message.setEmpty(isEmpty);
CompletableFuture<StoreResult> storeResultFuture = lmqQueueStore.putMessage(queueNames, message);
return storeResultFuture.thenCompose(storeResult -> HookResult.newHookResult(HookResult.SUCCESS, null,
JSON.toJSONBytes(storeResult)));
}