public CompletableFuture process()

in mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java [62:104]


    public CompletableFuture<HookResult> process(MqttMessageUpContext context, MqttMessage mqttMessage) {
        MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
        boolean isEmpty = false;
        //deal empty payload
        if (ByteBufUtil.getBytes(mqttPublishMessage.content()).length == 0) {
            mqttPublishMessage = MessageUtil.dealEmptyMessage(mqttPublishMessage);
            isEmpty = true;
        }
        String msgId = MessageClientIDSetter.createUniqID();
        MqttPublishVariableHeader variableHeader = mqttPublishMessage.variableHeader();
        String originTopic = variableHeader.topicName();
        String pubTopic = TopicUtils.normalizeTopic(originTopic);
        MqttTopic mqttTopic = TopicUtils.decode(pubTopic);
        firstTopicManager.checkFirstTopicIfCreated(mqttTopic.getFirstTopic());      // Check the firstTopic is existed
        Set<String> queueNames = wildcardManager.matchQueueSetByMsgTopic(pubTopic, context.getNamespace()); //According to topic to find queue
        long bornTime = System.currentTimeMillis();

        if (mqttPublishMessage.fixedHeader().isRetain()) {
            MqttPublishMessage retainedMqttPublishMessage = mqttPublishMessage.copy();
            //Change the retained flag of message that will send MQ is 0
            mqttPublishMessage = MessageUtil.removeRetainedFlag(mqttPublishMessage);
            //Keep the retained flag of message that will store meta
            Message metaMessage = MessageUtil.toMessage(retainedMqttPublishMessage);
            metaMessage.setMsgId(msgId);
            metaMessage.setBornTimestamp(bornTime);
            metaMessage.setEmpty(isEmpty);
            CompletableFuture<Boolean> storeRetainedFuture = retainedPersistManager.storeRetainedMessage(TopicUtils.normalizeTopic(metaMessage.getOriginTopic()), metaMessage);
            storeRetainedFuture.whenComplete((res, throwable) -> {
                if (throwable != null) {
                    logger.error("Store topic:{} retained message error.{}", metaMessage.getOriginTopic(), throwable);
                }
            });
        }

        Message message = MessageUtil.toMessage(mqttPublishMessage);
        message.setMsgId(msgId);
        message.setBornTimestamp(bornTime);
        message.setEmpty(isEmpty);

        CompletableFuture<StoreResult> storeResultFuture = lmqQueueStore.putMessage(queueNames, message);
        return storeResultFuture.thenCompose(storeResult -> HookResult.newHookResult(HookResult.SUCCESS, null,
            JSON.toJSONBytes(storeResult)));
    }