static void ProcessPublishMessage()

in src/mqtt_client.c [690:775]


static void ProcessPublishMessage(MQTT_CLIENT* mqtt_client, uint8_t* initialPos, size_t packetLength, int flags)
{
    bool isDuplicateMsg = (flags & DUPLICATE_FLAG_MASK) ? true : false;
    bool isRetainMsg = (flags & RETAIN_FLAG_MASK) ? true : false;
    QOS_VALUE qosValue = (flags == 0) ? DELIVER_AT_MOST_ONCE : (flags & QOS_LEAST_ONCE_FLAG_MASK) ? DELIVER_AT_LEAST_ONCE : DELIVER_EXACTLY_ONCE;

    uint8_t* iterator = initialPos;
    size_t numberOfBytesToBeRead = packetLength;
    size_t lengthOfTopicName = numberOfBytesToBeRead;
    char* topicName = byteutil_readUTF(&iterator, &lengthOfTopicName);
    if (topicName == NULL)
    {
        LogError("Publish MSG: failure reading topic name");
        set_error_callback(mqtt_client, MQTT_CLIENT_PARSE_ERROR);
    }
    else
    {
        STRING_HANDLE trace_log = NULL;

#ifndef NO_LOGGING
        if (is_trace_enabled(mqtt_client))
        {
            trace_log = STRING_construct_sprintf("PUBLISH | IS_DUP: %s | RETAIN: %d | QOS: %s | TOPIC_NAME: %s", isDuplicateMsg ? TRUE_CONST : FALSE_CONST,
                isRetainMsg ? 1 : 0, MU_ENUM_TO_STRING_2(QOS_VALUE, qosValue), topicName);
        }
#endif
        uint16_t packetId = 0;
        numberOfBytesToBeRead = packetLength - (iterator - initialPos);
        if (qosValue != DELIVER_AT_MOST_ONCE)
        {
            packetId = byteutil_read_uint16(&iterator, numberOfBytesToBeRead);
#ifndef NO_LOGGING
            if (is_trace_enabled(mqtt_client))
            {
                STRING_sprintf(trace_log, " | PACKET_ID: %"PRIu16, packetId);
            }
#endif
        }
        if ((qosValue != DELIVER_AT_MOST_ONCE) && (packetId == 0))
        {
            LogError("Publish MSG: packetId=0, invalid");
            set_error_callback(mqtt_client, MQTT_CLIENT_PARSE_ERROR);
        }
        else
        {
            numberOfBytesToBeRead = packetLength - (iterator - initialPos);

            MQTT_MESSAGE_HANDLE msgHandle = mqttmessage_create_in_place(packetId, topicName, qosValue, iterator, numberOfBytesToBeRead);
            if (msgHandle == NULL)
            {
                LogError("failure in mqttmessage_create");
                set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
            }
            else if (mqttmessage_setIsDuplicateMsg(msgHandle, isDuplicateMsg) != 0 ||
                     mqttmessage_setIsRetained(msgHandle, isRetainMsg) != 0)
            {
                LogError("failure setting mqtt message property");
                set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
            }
            else
            {
#ifndef NO_LOGGING
                if (is_trace_enabled(mqtt_client))
                {
                    STRING_sprintf(trace_log, " | PAYLOAD_LEN: %lu", (unsigned long)numberOfBytesToBeRead);
                    log_incoming_trace(mqtt_client, trace_log);
                }
#endif
                MQTT_CLIENT_ACK_OPTION ack_option = mqtt_client->fnMessageRecv(msgHandle, mqtt_client->ctx);

                if (ack_option == MQTT_CLIENT_ACK_SYNC)
                {
                    SendMessageAck(mqtt_client, packetId, qosValue);
                }
            }
            mqttmessage_destroy(msgHandle);
        }

        if (trace_log != NULL)
        {
            STRING_delete(trace_log);
        }

        free(topicName);
    }
}