in src/mqtt_client.c [690:775]
static void ProcessPublishMessage(MQTT_CLIENT* mqtt_client, uint8_t* initialPos, size_t packetLength, int flags)
{
bool isDuplicateMsg = (flags & DUPLICATE_FLAG_MASK) ? true : false;
bool isRetainMsg = (flags & RETAIN_FLAG_MASK) ? true : false;
QOS_VALUE qosValue = (flags == 0) ? DELIVER_AT_MOST_ONCE : (flags & QOS_LEAST_ONCE_FLAG_MASK) ? DELIVER_AT_LEAST_ONCE : DELIVER_EXACTLY_ONCE;
uint8_t* iterator = initialPos;
size_t numberOfBytesToBeRead = packetLength;
size_t lengthOfTopicName = numberOfBytesToBeRead;
char* topicName = byteutil_readUTF(&iterator, &lengthOfTopicName);
if (topicName == NULL)
{
LogError("Publish MSG: failure reading topic name");
set_error_callback(mqtt_client, MQTT_CLIENT_PARSE_ERROR);
}
else
{
STRING_HANDLE trace_log = NULL;
#ifndef NO_LOGGING
if (is_trace_enabled(mqtt_client))
{
trace_log = STRING_construct_sprintf("PUBLISH | IS_DUP: %s | RETAIN: %d | QOS: %s | TOPIC_NAME: %s", isDuplicateMsg ? TRUE_CONST : FALSE_CONST,
isRetainMsg ? 1 : 0, MU_ENUM_TO_STRING_2(QOS_VALUE, qosValue), topicName);
}
#endif
uint16_t packetId = 0;
numberOfBytesToBeRead = packetLength - (iterator - initialPos);
if (qosValue != DELIVER_AT_MOST_ONCE)
{
packetId = byteutil_read_uint16(&iterator, numberOfBytesToBeRead);
#ifndef NO_LOGGING
if (is_trace_enabled(mqtt_client))
{
STRING_sprintf(trace_log, " | PACKET_ID: %"PRIu16, packetId);
}
#endif
}
if ((qosValue != DELIVER_AT_MOST_ONCE) && (packetId == 0))
{
LogError("Publish MSG: packetId=0, invalid");
set_error_callback(mqtt_client, MQTT_CLIENT_PARSE_ERROR);
}
else
{
numberOfBytesToBeRead = packetLength - (iterator - initialPos);
MQTT_MESSAGE_HANDLE msgHandle = mqttmessage_create_in_place(packetId, topicName, qosValue, iterator, numberOfBytesToBeRead);
if (msgHandle == NULL)
{
LogError("failure in mqttmessage_create");
set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
}
else if (mqttmessage_setIsDuplicateMsg(msgHandle, isDuplicateMsg) != 0 ||
mqttmessage_setIsRetained(msgHandle, isRetainMsg) != 0)
{
LogError("failure setting mqtt message property");
set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
}
else
{
#ifndef NO_LOGGING
if (is_trace_enabled(mqtt_client))
{
STRING_sprintf(trace_log, " | PAYLOAD_LEN: %lu", (unsigned long)numberOfBytesToBeRead);
log_incoming_trace(mqtt_client, trace_log);
}
#endif
MQTT_CLIENT_ACK_OPTION ack_option = mqtt_client->fnMessageRecv(msgHandle, mqtt_client->ctx);
if (ack_option == MQTT_CLIENT_ACK_SYNC)
{
SendMessageAck(mqtt_client, packetId, qosValue);
}
}
mqttmessage_destroy(msgHandle);
}
if (trace_log != NULL)
{
STRING_delete(trace_log);
}
free(topicName);
}
}