src/mqtt_codec.c (1,038 lines of code) (raw):
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
#include <stdlib.h>
#include <limits.h>
#include "azure_c_shared_utility/optimize_size.h"
#include "azure_c_shared_utility/gballoc.h"
#include "azure_c_shared_utility/buffer_.h"
#include "azure_c_shared_utility/strings.h"
#include "azure_macro_utils/macro_utils.h"
#include "azure_c_shared_utility/xlogging.h"
#include "azure_umqtt_c/mqtt_codec.h"
#include <inttypes.h>
#define PAYLOAD_OFFSET 5
#define PACKET_TYPE_BYTE(p) (CONTROL_PACKET_TYPE)((uint8_t)(((uint8_t)(p)) & 0xf0))
#define FLAG_VALUE_BYTE(p) ((uint8_t)(((uint8_t)(p)) & 0xf))
#define USERNAME_FLAG 0x80
#define PASSWORD_FLAG 0x40
#define WILL_RETAIN_FLAG 0x20
#define WILL_QOS_FLAG_ 0x18
#define WILL_FLAG_FLAG 0x04
#define CLEAN_SESSION_FLAG 0x02
#define NEXT_128_CHUNK 0x80
#define PUBLISH_DUP_FLAG 0x8
#define PUBLISH_QOS_EXACTLY_ONCE 0x4
#define PUBLISH_QOS_AT_LEAST_ONCE 0x2
#define PUBLISH_QOS_RETAIN 0x1
#define PROTOCOL_NUMBER 4
#define CONN_FLAG_BYTE_OFFSET 7
#define CONNECT_FIXED_HEADER_SIZE 2
#define CONNECT_VARIABLE_HEADER_SIZE 10
#define SUBSCRIBE_FIXED_HEADER_FLAG 0x2
#define UNSUBSCRIBE_FIXED_HEADER_FLAG 0x2
#define MAX_SEND_SIZE 0xFFFFFF7F // 268435455
// This captures the maximum packet size for 3 digits.
// If it's above this value then we bail out of the loop
#define MAX_3_DIGIT_PACKET_SIZE 2097152
#define CODEC_STATE_VALUES \
CODEC_STATE_FIXED_HEADER, \
CODEC_STATE_VAR_HEADER, \
CODEC_STATE_PAYLOAD
static const char* const TRUE_CONST = "true";
static const char* const FALSE_CONST = "false";
MU_DEFINE_ENUM(CODEC_STATE_RESULT, CODEC_STATE_VALUES);
typedef struct MQTTCODEC_INSTANCE_TAG
{
CONTROL_PACKET_TYPE currPacket;
CODEC_STATE_RESULT codecState;
size_t bufferOffset;
int headerFlags;
BUFFER_HANDLE headerData;
ON_PACKET_COMPLETE_CALLBACK packetComplete;
void* callContext;
uint8_t storeRemainLen[4];
size_t remainLenIndex;
} MQTTCODEC_INSTANCE;
typedef struct PUBLISH_HEADER_INFO_TAG
{
const char* topicName;
uint16_t packetId;
const char* msgBuffer;
QOS_VALUE qualityOfServiceValue;
} PUBLISH_HEADER_INFO;
static const char* retrieve_qos_value(QOS_VALUE value)
{
switch (value)
{
case DELIVER_AT_MOST_ONCE:
return "DELIVER_AT_MOST_ONCE";
case DELIVER_AT_LEAST_ONCE:
return "DELIVER_AT_LEAST_ONCE";
case DELIVER_EXACTLY_ONCE:
default:
return "DELIVER_EXACTLY_ONCE";
}
}
static void byteutil_writeByte(uint8_t** buffer, uint8_t value)
{
if (buffer != NULL)
{
**buffer = value;
(*buffer)++;
}
}
static void byteutil_writeInt(uint8_t** buffer, uint16_t value)
{
if (buffer != NULL)
{
**buffer = (char)(value / 256);
(*buffer)++;
**buffer = (char)(value % 256);
(*buffer)++;
}
}
static void byteutil_writeUTF(uint8_t** buffer, const char* stringData, uint16_t len)
{
if (buffer != NULL)
{
byteutil_writeInt(buffer, len);
(void)memcpy(*buffer, stringData, len);
*buffer += len;
}
}
static CONTROL_PACKET_TYPE processControlPacketType(uint8_t pktByte, int* flags)
{
CONTROL_PACKET_TYPE result;
result = PACKET_TYPE_BYTE(pktByte);
if (flags != NULL)
{
*flags = FLAG_VALUE_BYTE(pktByte);
}
return result;
}
static int addListItemsToUnsubscribePacket(BUFFER_HANDLE ctrlPacket, const char** payloadList, size_t payloadCount, STRING_HANDLE trace_log)
{
int result = 0;
size_t index = 0;
for (index = 0; index < payloadCount && result == 0; index++)
{
// Add the Payload
size_t offsetLen = BUFFER_length(ctrlPacket);
size_t topicLen = strlen(payloadList[index]);
if (topicLen > USHRT_MAX)
{
result = MU_FAILURE;
}
else if (BUFFER_enlarge(ctrlPacket, topicLen + 2) != 0)
{
result = MU_FAILURE;
}
else
{
uint8_t* iterator = BUFFER_u_char(ctrlPacket);
iterator += offsetLen;
byteutil_writeUTF(&iterator, payloadList[index], (uint16_t)topicLen);
}
if (trace_log != NULL)
{
STRING_sprintf(trace_log, " | TOPIC_NAME: %s", payloadList[index]);
}
}
return result;
}
static int addListItemsToSubscribePacket(BUFFER_HANDLE ctrlPacket, SUBSCRIBE_PAYLOAD* payloadList, size_t payloadCount, STRING_HANDLE trace_log)
{
int result = 0;
size_t index = 0;
for (index = 0; index < payloadCount && result == 0; index++)
{
// Add the Payload
size_t offsetLen = BUFFER_length(ctrlPacket);
size_t topicLen = strlen(payloadList[index].subscribeTopic);
if (topicLen > USHRT_MAX)
{
result = MU_FAILURE;
}
else if (BUFFER_enlarge(ctrlPacket, topicLen + 2 + 1) != 0)
{
result = MU_FAILURE;
}
else
{
uint8_t* iterator = BUFFER_u_char(ctrlPacket);
iterator += offsetLen;
byteutil_writeUTF(&iterator, payloadList[index].subscribeTopic, (uint16_t)topicLen);
*iterator = payloadList[index].qosReturn;
if (trace_log != NULL)
{
STRING_sprintf(trace_log, " | TOPIC_NAME: %s | QOS: %d", payloadList[index].subscribeTopic, (int)payloadList[index].qosReturn);
}
}
}
return result;
}
static int constructConnectVariableHeader(BUFFER_HANDLE ctrlPacket, const MQTT_CLIENT_OPTIONS* mqttOptions, STRING_HANDLE trace_log)
{
int result = 0;
if (BUFFER_enlarge(ctrlPacket, CONNECT_VARIABLE_HEADER_SIZE) != 0)
{
result = MU_FAILURE;
}
else
{
uint8_t* iterator = BUFFER_u_char(ctrlPacket);
if (iterator == NULL)
{
result = MU_FAILURE;
}
else
{
if (trace_log != NULL)
{
STRING_sprintf(trace_log, " | VER: %d | KEEPALIVE: %d | FLAGS:", PROTOCOL_NUMBER, mqttOptions->keepAliveInterval);
}
byteutil_writeUTF(&iterator, "MQTT", 4);
byteutil_writeByte(&iterator, PROTOCOL_NUMBER);
byteutil_writeByte(&iterator, 0); // Flags will be entered later
byteutil_writeInt(&iterator, mqttOptions->keepAliveInterval);
result = 0;
}
}
return result;
}
static int constructPublishVariableHeader(BUFFER_HANDLE ctrlPacket, const PUBLISH_HEADER_INFO* publishHeader, STRING_HANDLE trace_log)
{
int result = 0;
size_t topicLen = 0;
size_t spaceLen = 0;
size_t idLen = 0;
size_t currLen = BUFFER_length(ctrlPacket);
topicLen = strlen(publishHeader->topicName);
spaceLen += 2;
if (publishHeader->qualityOfServiceValue != DELIVER_AT_MOST_ONCE)
{
// Packet Id is only set if the QOS is not 0
idLen = 2;
}
if (topicLen > USHRT_MAX)
{
result = MU_FAILURE;
}
else if (BUFFER_enlarge(ctrlPacket, topicLen + idLen + spaceLen) != 0)
{
result = MU_FAILURE;
}
else
{
uint8_t* iterator = BUFFER_u_char(ctrlPacket);
if (iterator == NULL)
{
result = MU_FAILURE;
}
else
{
iterator += currLen;
/* The Topic Name MUST be present as the first field in the PUBLISH Packet Variable header.It MUST be 792 a UTF-8 encoded string [MQTT-3.3.2-1] as defined in section 1.5.3.*/
byteutil_writeUTF(&iterator, publishHeader->topicName, (uint16_t)topicLen);
if (trace_log != NULL)
{
STRING_sprintf(trace_log, " | TOPIC_NAME: %s", publishHeader->topicName);
}
if (idLen > 0)
{
if (trace_log != NULL)
{
STRING_sprintf(trace_log, " | PACKET_ID: %"PRIu16, publishHeader->packetId);
}
byteutil_writeInt(&iterator, publishHeader->packetId);
}
result = 0;
}
}
return result;
}
static int constructSubscibeTypeVariableHeader(BUFFER_HANDLE ctrlPacket, uint16_t packetId)
{
int result = 0;
if (BUFFER_enlarge(ctrlPacket, 2) != 0)
{
result = MU_FAILURE;
}
else
{
uint8_t* iterator = BUFFER_u_char(ctrlPacket);
if (iterator == NULL)
{
result = MU_FAILURE;
}
else
{
byteutil_writeInt(&iterator, packetId);
result = 0;
}
}
return result;
}
static BUFFER_HANDLE constructPublishReply(CONTROL_PACKET_TYPE type, uint8_t flags, uint16_t packetId)
{
BUFFER_HANDLE result = BUFFER_new();
if (result != NULL)
{
if (BUFFER_pre_build(result, 4) != 0)
{
BUFFER_delete(result);
result = NULL;
}
else
{
uint8_t* iterator = BUFFER_u_char(result);
if (iterator == NULL)
{
BUFFER_delete(result);
result = NULL;
}
else
{
*iterator = (uint8_t)type | flags;
iterator++;
*iterator = 0x2;
iterator++;
byteutil_writeInt(&iterator, packetId);
}
}
}
return result;
}
static int constructFixedHeader(BUFFER_HANDLE ctrlPacket, CONTROL_PACKET_TYPE packetType, uint8_t flags)
{
int result;
size_t packetLen = BUFFER_length(ctrlPacket);
uint8_t remainSize[4] ={ 0 };
size_t index = 0;
// Calculate the length of packet
do
{
uint8_t encode = packetLen % 128;
packetLen /= 128;
// if there are more data to encode, set the top bit of this byte
if (packetLen > 0)
{
encode |= NEXT_128_CHUNK;
}
remainSize[index++] = encode;
} while (packetLen > 0);
BUFFER_HANDLE fixedHeader = BUFFER_new();
if (fixedHeader == NULL)
{
result = MU_FAILURE;
}
else if (BUFFER_pre_build(fixedHeader, index + 1) != 0)
{
BUFFER_delete(fixedHeader);
result = MU_FAILURE;
}
else
{
uint8_t* iterator = BUFFER_u_char(fixedHeader);
*iterator = (uint8_t)packetType | flags;
iterator++;
(void)memcpy(iterator, remainSize, index);
result = BUFFER_prepend(ctrlPacket, fixedHeader);
BUFFER_delete(fixedHeader);
}
return result;
}
static int constructConnPayload(BUFFER_HANDLE ctrlPacket, const MQTT_CLIENT_OPTIONS* mqttOptions, STRING_HANDLE trace_log)
{
int result = 0;
size_t clientLen = 0;
size_t usernameLen = 0;
size_t passwordLen = 0;
size_t willMessageLen = 0;
size_t willTopicLen = 0;
size_t spaceLen = 0;
size_t currLen = 0;
size_t totalLen = 0;
if (mqttOptions->clientId != NULL)
{
spaceLen += 2;
clientLen = strlen(mqttOptions->clientId);
}
if (mqttOptions->username != NULL)
{
spaceLen += 2;
usernameLen = strlen(mqttOptions->username);
}
if (mqttOptions->password != NULL)
{
spaceLen += 2;
passwordLen = strlen(mqttOptions->password);
}
if (mqttOptions->willMessage != NULL)
{
spaceLen += 2;
willMessageLen = strlen(mqttOptions->willMessage);
}
if (mqttOptions->willTopic != NULL)
{
spaceLen += 2;
willTopicLen = strlen(mqttOptions->willTopic);
}
currLen = ctrlPacket == NULL ? 0 : BUFFER_length(ctrlPacket);
totalLen = clientLen + usernameLen + passwordLen + willMessageLen + willTopicLen + spaceLen;
if (ctrlPacket == NULL)
{
result = MU_FAILURE;
}
// Validate the Username & Password
else if (clientLen > USHRT_MAX)
{
result = MU_FAILURE;
}
else if (usernameLen == 0 && passwordLen > 0)
{
result = MU_FAILURE;
}
else if ((willMessageLen > 0 && willTopicLen == 0) || (willTopicLen > 0 && willMessageLen == 0))
{
result = MU_FAILURE;
}
else if (BUFFER_enlarge(ctrlPacket, totalLen) != 0)
{
result = MU_FAILURE;
}
else
{
uint8_t* packet = BUFFER_u_char(ctrlPacket);
uint8_t* iterator = packet;
iterator += currLen;
byteutil_writeUTF(&iterator, mqttOptions->clientId, (uint16_t)clientLen);
// TODO: Read on the Will Topic
if (willMessageLen > USHRT_MAX || willTopicLen > USHRT_MAX || usernameLen > USHRT_MAX || passwordLen > USHRT_MAX)
{
result = MU_FAILURE;
}
else
{
STRING_HANDLE connect_payload_trace = NULL;
if (trace_log != NULL)
{
connect_payload_trace = STRING_new();
}
if (willMessageLen > 0 && willTopicLen > 0)
{
if (trace_log != NULL)
{
(void)STRING_sprintf(connect_payload_trace, " | WILL_TOPIC: %s", mqttOptions->willTopic);
}
packet[CONN_FLAG_BYTE_OFFSET] |= WILL_FLAG_FLAG;
byteutil_writeUTF(&iterator, mqttOptions->willTopic, (uint16_t)willTopicLen);
packet[CONN_FLAG_BYTE_OFFSET] |= (mqttOptions->qualityOfServiceValue << 3);
if (mqttOptions->messageRetain)
{
packet[CONN_FLAG_BYTE_OFFSET] |= WILL_RETAIN_FLAG;
}
byteutil_writeUTF(&iterator, mqttOptions->willMessage, (uint16_t)willMessageLen);
}
if (usernameLen > 0)
{
packet[CONN_FLAG_BYTE_OFFSET] |= USERNAME_FLAG;
byteutil_writeUTF(&iterator, mqttOptions->username, (uint16_t)usernameLen);
if (trace_log != NULL)
{
(void)STRING_sprintf(connect_payload_trace, " | USERNAME: %s", mqttOptions->username);
}
}
if (passwordLen > 0)
{
packet[CONN_FLAG_BYTE_OFFSET] |= PASSWORD_FLAG;
byteutil_writeUTF(&iterator, mqttOptions->password, (uint16_t)passwordLen);
if (trace_log != NULL)
{
(void)STRING_sprintf(connect_payload_trace, " | PWD: XXXX");
}
}
// TODO: Get the rest of the flags
if (trace_log != NULL)
{
(void)STRING_sprintf(connect_payload_trace, " | CLEAN: %s", mqttOptions->useCleanSession ? "1" : "0");
}
if (mqttOptions->useCleanSession)
{
packet[CONN_FLAG_BYTE_OFFSET] |= CLEAN_SESSION_FLAG;
}
if (trace_log != NULL)
{
(void)STRING_sprintf(trace_log, " %lu", packet[CONN_FLAG_BYTE_OFFSET]);
(void)STRING_concat_with_STRING(trace_log, connect_payload_trace);
STRING_delete(connect_payload_trace);
}
result = 0;
}
}
return result;
}
static int prepareheaderDataInfo(MQTTCODEC_INSTANCE* codecData, uint8_t remainLen)
{
int result;
result = 0;
codecData->storeRemainLen[codecData->remainLenIndex++] = remainLen;
if (remainLen <= 0x7f)
{
int multiplier = 1;
int totalLen = 0;
size_t index = 0;
uint8_t encodeByte = 0;
do
{
encodeByte = codecData->storeRemainLen[index++];
totalLen += (encodeByte & 127) * multiplier;
multiplier *= NEXT_128_CHUNK;
if (multiplier > MAX_3_DIGIT_PACKET_SIZE)
{
result = MU_FAILURE;
break;
}
} while ((encodeByte & NEXT_128_CHUNK) != 0);
if (result != 0 || totalLen > MAX_SEND_SIZE)
{
LogError("Receive buffer too large for MQTT packet");
result = MU_FAILURE;
}
else
{
codecData->codecState = CODEC_STATE_VAR_HEADER;
// Reset remainLen Index
codecData->remainLenIndex = 0;
memset(codecData->storeRemainLen, 0, 4 * sizeof(uint8_t));
if (totalLen > 0)
{
codecData->bufferOffset = 0;
codecData->headerData = BUFFER_new();
if (codecData->headerData == NULL)
{
/* Codes_SRS_MQTT_CODEC_07_035: [ If any error is encountered then the packet state will be marked as error and mqtt_codec_bytesReceived shall return a non-zero value. ] */
LogError("Failed BUFFER_new");
result = MU_FAILURE;
}
else
{
if (BUFFER_pre_build(codecData->headerData, totalLen) != 0)
{
/* Codes_SRS_MQTT_CODEC_07_035: [ If any error is encountered then the packet state will be marked as error and mqtt_codec_bytesReceived shall return a non-zero value. ] */
LogError("Failed BUFFER_pre_build");
result = MU_FAILURE;
}
}
}
}
}
else if (codecData->remainLenIndex == (sizeof(codecData->storeRemainLen) / sizeof(codecData->storeRemainLen[0])))
{
// The maximum number of bytes in the Remaining Length field is four
// This allows applications to send Control Packets of size up to 268,435,455 (256 MB).
// The representation of this number on the wire is: 0xFF, 0xFF, 0xFF, 0x7F.
// The last byte has exceed the max value of 0x7F
LogError("MQTT packet len is invalid");
result = MU_FAILURE;
}
return result;
}
static void completePacketData(MQTTCODEC_INSTANCE* codecData)
{
if (codecData->packetComplete != NULL)
{
codecData->packetComplete(codecData->callContext, codecData->currPacket, codecData->headerFlags, codecData->headerData);
}
// Clean up data
codecData->currPacket = UNKNOWN_TYPE;
codecData->codecState = CODEC_STATE_FIXED_HEADER;
codecData->headerFlags = 0;
BUFFER_delete(codecData->headerData);
codecData->headerData = NULL;
}
static void clear_codec_data(MQTTCODEC_INSTANCE* codec_data)
{
// Clear the code instance data
codec_data->currPacket = UNKNOWN_TYPE;
codec_data->codecState = CODEC_STATE_FIXED_HEADER;
codec_data->headerFlags = 0;
codec_data->bufferOffset = 0;
codec_data->headerData = NULL;
memset(codec_data->storeRemainLen, 0, 4 * sizeof(uint8_t));
codec_data->remainLenIndex = 0;
}
void mqtt_codec_reset(MQTTCODEC_HANDLE handle)
{
if (handle != NULL)
{
clear_codec_data(handle);
}
}
MQTTCODEC_HANDLE mqtt_codec_create(ON_PACKET_COMPLETE_CALLBACK packetComplete, void* callbackCtx)
{
MQTTCODEC_HANDLE result;
result = malloc(sizeof(MQTTCODEC_INSTANCE));
/* Codes_SRS_MQTT_CODEC_07_001: [If a failure is encountered then mqtt_codec_create shall return NULL.] */
if (result != NULL)
{
/* Codes_SRS_MQTT_CODEC_07_002: [On success mqtt_codec_create shall return a MQTTCODEC_HANDLE value.] */
clear_codec_data(result);
result->packetComplete = packetComplete;
result->callContext = callbackCtx;
}
return result;
}
void mqtt_codec_destroy(MQTTCODEC_HANDLE handle)
{
/* Codes_SRS_MQTT_CODEC_07_003: [If the handle parameter is NULL then mqtt_codec_destroy shall do nothing.] */
if (handle != NULL)
{
MQTTCODEC_INSTANCE* codecData = (MQTTCODEC_INSTANCE*)handle;
/* Codes_SRS_MQTT_CODEC_07_004: [mqtt_codec_destroy shall deallocate all memory that has been allocated by this object.] */
BUFFER_delete(codecData->headerData);
free(codecData);
}
}
BUFFER_HANDLE mqtt_codec_connect(const MQTT_CLIENT_OPTIONS* mqttOptions, STRING_HANDLE trace_log)
{
BUFFER_HANDLE result;
/* Codes_SRS_MQTT_CODEC_07_008: [If the parameters mqttOptions is NULL then mqtt_codec_connect shall return a null value.] */
if (mqttOptions == NULL)
{
result = NULL;
}
else
{
/* Codes_SRS_MQTT_CODEC_07_009: [mqtt_codec_connect shall construct a BUFFER_HANDLE that represents a MQTT CONNECT packet.] */
result = BUFFER_new();
if (result != NULL)
{
STRING_HANDLE varible_header_log = NULL;
if (trace_log != NULL)
{
varible_header_log = STRING_new();
}
// Add Variable Header Information
if (constructConnectVariableHeader(result, mqttOptions, varible_header_log) != 0)
{
/* Codes_SRS_MQTT_CODEC_07_010: [If any error is encountered then mqtt_codec_connect shall return NULL.] */
BUFFER_delete(result);
result = NULL;
}
else
{
if (constructConnPayload(result, mqttOptions, varible_header_log) != 0)
{
/* Codes_SRS_MQTT_CODEC_07_010: [If any error is encountered then mqtt_codec_connect shall return NULL.] */
BUFFER_delete(result);
result = NULL;
}
else
{
if (trace_log != NULL)
{
(void)STRING_copy(trace_log, "CONNECT");
}
if (constructFixedHeader(result, CONNECT_TYPE, 0) != 0)
{
/* Codes_SRS_MQTT_CODEC_07_010: [If any error is encountered then mqtt_codec_connect shall return NULL.] */
BUFFER_delete(result);
result = NULL;
}
else
{
if (trace_log != NULL)
{
(void)STRING_concat_with_STRING(trace_log, varible_header_log);
}
}
}
if (varible_header_log != NULL)
{
STRING_delete(varible_header_log);
}
}
}
}
return result;
}
BUFFER_HANDLE mqtt_codec_disconnect()
{
/* Codes_SRS_MQTT_CODEC_07_011: [On success mqtt_codec_disconnect shall construct a BUFFER_HANDLE that represents a MQTT DISCONNECT packet.] */
BUFFER_HANDLE result = BUFFER_new();
if (result != NULL)
{
if (BUFFER_enlarge(result, 2) != 0)
{
/* Codes_SRS_MQTT_CODEC_07_012: [If any error is encountered mqtt_codec_disconnect shall return NULL.] */
BUFFER_delete(result);
result = NULL;
}
else
{
uint8_t* iterator = BUFFER_u_char(result);
if (iterator == NULL)
{
/* Codes_SRS_MQTT_CODEC_07_012: [If any error is encountered mqtt_codec_disconnect shall return NULL.] */
BUFFER_delete(result);
result = NULL;
}
else
{
iterator[0] = DISCONNECT_TYPE;
iterator[1] = 0;
}
}
}
return result;
}
BUFFER_HANDLE mqtt_codec_publish(QOS_VALUE qosValue, bool duplicateMsg, bool serverRetain, uint16_t packetId, const char* topicName, const uint8_t* msgBuffer, size_t buffLen, STRING_HANDLE trace_log)
{
BUFFER_HANDLE result;
/* Codes_SRS_MQTT_CODEC_07_005: [If the parameters topicName is NULL then mqtt_codec_publish shall return NULL.] */
if (topicName == NULL)
{
result = NULL;
}
/* Codes_SRS_MQTT_CODEC_07_036: [mqtt_codec_publish shall return NULL if the buffLen variable is greater than the MAX_SEND_SIZE (0xFFFFFF7F).] */
else if (buffLen > MAX_SEND_SIZE)
{
/* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */
result = NULL;
}
else
{
PUBLISH_HEADER_INFO publishInfo ={ 0 };
publishInfo.topicName = topicName;
publishInfo.packetId = packetId;
publishInfo.qualityOfServiceValue = qosValue;
uint8_t headerFlags = 0;
if (duplicateMsg) headerFlags |= PUBLISH_DUP_FLAG;
if (serverRetain) headerFlags |= PUBLISH_QOS_RETAIN;
if (qosValue != DELIVER_AT_MOST_ONCE)
{
if (qosValue == DELIVER_AT_LEAST_ONCE)
{
headerFlags |= PUBLISH_QOS_AT_LEAST_ONCE;
}
else
{
headerFlags |= PUBLISH_QOS_EXACTLY_ONCE;
}
}
/* Codes_SRS_MQTT_CODEC_07_007: [mqtt_codec_publish shall return a BUFFER_HANDLE that represents a MQTT PUBLISH message.] */
result = BUFFER_new();
if (result != NULL)
{
STRING_HANDLE varible_header_log = NULL;
if (trace_log != NULL)
{
varible_header_log = STRING_construct_sprintf(" | IS_DUP: %s | RETAIN: %d | QOS: %s", duplicateMsg ? TRUE_CONST : FALSE_CONST,
serverRetain ? 1 : 0,
retrieve_qos_value(publishInfo.qualityOfServiceValue) );
}
if (constructPublishVariableHeader(result, &publishInfo, varible_header_log) != 0)
{
/* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */
BUFFER_delete(result);
result = NULL;
}
else
{
size_t payloadOffset = BUFFER_length(result);
if (buffLen > 0)
{
if (BUFFER_enlarge(result, buffLen) != 0)
{
/* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */
BUFFER_delete(result);
result = NULL;
}
else
{
uint8_t* iterator = BUFFER_u_char(result);
if (iterator == NULL)
{
/* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */
BUFFER_delete(result);
result = NULL;
}
else
{
iterator += payloadOffset;
// Write Message
(void)memcpy(iterator, msgBuffer, buffLen);
if (trace_log)
{
STRING_sprintf(varible_header_log, " | PAYLOAD_LEN: %lu", (unsigned long)buffLen);
}
}
}
}
if (result != NULL)
{
if (trace_log != NULL)
{
(void)STRING_copy(trace_log, "PUBLISH");
}
if (constructFixedHeader(result, PUBLISH_TYPE, headerFlags) != 0)
{
/* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */
BUFFER_delete(result);
result = NULL;
}
else
{
if (trace_log != NULL)
{
(void)STRING_concat_with_STRING(trace_log, varible_header_log);
}
}
}
}
if (varible_header_log != NULL)
{
STRING_delete(varible_header_log);
}
}
}
return result;
}
BUFFER_HANDLE mqtt_codec_publishAck(uint16_t packetId)
{
/* Codes_SRS_MQTT_CODEC_07_013: [On success mqtt_codec_publishAck shall return a BUFFER_HANDLE representation of a MQTT PUBACK packet.] */
/* Codes_SRS_MQTT_CODEC_07_014 : [If any error is encountered then mqtt_codec_publishAck shall return NULL.] */
BUFFER_HANDLE result = constructPublishReply(PUBACK_TYPE, 0, packetId);
return result;
}
BUFFER_HANDLE mqtt_codec_publishReceived(uint16_t packetId)
{
/* Codes_SRS_MQTT_CODEC_07_015: [On success mqtt_codec_publishRecieved shall return a BUFFER_HANDLE representation of a MQTT PUBREC packet.] */
/* Codes_SRS_MQTT_CODEC_07_016 : [If any error is encountered then mqtt_codec_publishRecieved shall return NULL.] */
BUFFER_HANDLE result = constructPublishReply(PUBREC_TYPE, 0, packetId);
return result;
}
BUFFER_HANDLE mqtt_codec_publishRelease(uint16_t packetId)
{
/* Codes_SRS_MQTT_CODEC_07_017: [On success mqtt_codec_publishRelease shall return a BUFFER_HANDLE representation of a MQTT PUBREL packet.] */
/* Codes_SRS_MQTT_CODEC_07_018 : [If any error is encountered then mqtt_codec_publishRelease shall return NULL.] */
BUFFER_HANDLE result = constructPublishReply(PUBREL_TYPE, 2, packetId);
return result;
}
BUFFER_HANDLE mqtt_codec_publishComplete(uint16_t packetId)
{
/* Codes_SRS_MQTT_CODEC_07_019: [On success mqtt_codec_publishComplete shall return a BUFFER_HANDLE representation of a MQTT PUBCOMP packet.] */
/* Codes_SRS_MQTT_CODEC_07_020 : [If any error is encountered then mqtt_codec_publishComplete shall return NULL.] */
BUFFER_HANDLE result = constructPublishReply(PUBCOMP_TYPE, 0, packetId);
return result;
}
BUFFER_HANDLE mqtt_codec_ping()
{
/* Codes_SRS_MQTT_CODEC_07_021: [On success mqtt_codec_ping shall construct a BUFFER_HANDLE that represents a MQTT PINGREQ packet.] */
BUFFER_HANDLE result = BUFFER_new();
if (result != NULL)
{
if (BUFFER_enlarge(result, 2) != 0)
{
/* Codes_SRS_MQTT_CODEC_07_022: [If any error is encountered mqtt_codec_ping shall return NULL.] */
BUFFER_delete(result);
result = NULL;
}
else
{
uint8_t* iterator = BUFFER_u_char(result);
if (iterator == NULL)
{
/* Codes_SRS_MQTT_CODEC_07_022: [If any error is encountered mqtt_codec_ping shall return NULL.] */
BUFFER_delete(result);
result = NULL;
}
else
{
iterator[0] = PINGREQ_TYPE;
iterator[1] = 0;
}
}
}
return result;
}
BUFFER_HANDLE mqtt_codec_subscribe(uint16_t packetId, SUBSCRIBE_PAYLOAD* subscribeList, size_t count, STRING_HANDLE trace_log)
{
BUFFER_HANDLE result;
/* Codes_SRS_MQTT_CODEC_07_023: [If the parameters subscribeList is NULL or if count is 0 then mqtt_codec_subscribe shall return NULL.] */
if (subscribeList == NULL || count == 0)
{
result = NULL;
}
else
{
/* Codes_SRS_MQTT_CODEC_07_026: [mqtt_codec_subscribe shall return a BUFFER_HANDLE that represents a MQTT SUBSCRIBE message.]*/
result = BUFFER_new();
if (result != NULL)
{
if (constructSubscibeTypeVariableHeader(result, packetId) != 0)
{
/* Codes_SRS_MQTT_CODEC_07_025: [If any error is encountered then mqtt_codec_subscribe shall return NULL.] */
BUFFER_delete(result);
result = NULL;
}
else
{
STRING_HANDLE sub_trace = NULL;
if (trace_log != NULL)
{
sub_trace = STRING_construct_sprintf(" | PACKET_ID: %"PRIu16, packetId);
}
/* Codes_SRS_MQTT_CODEC_07_024: [mqtt_codec_subscribe shall iterate through count items in the subscribeList.] */
if (addListItemsToSubscribePacket(result, subscribeList, count, sub_trace) != 0)
{
/* Codes_SRS_MQTT_CODEC_07_025: [If any error is encountered then mqtt_codec_subscribe shall return NULL.] */
BUFFER_delete(result);
result = NULL;
}
else
{
if (trace_log != NULL)
{
STRING_concat(trace_log, "SUBSCRIBE");
}
if (constructFixedHeader(result, SUBSCRIBE_TYPE, SUBSCRIBE_FIXED_HEADER_FLAG) != 0)
{
/* Codes_SRS_MQTT_CODEC_07_025: [If any error is encountered then mqtt_codec_subscribe shall return NULL.] */
BUFFER_delete(result);
result = NULL;
}
else
{
if (trace_log != NULL)
{
(void)STRING_concat_with_STRING(trace_log, sub_trace);
}
}
}
if (sub_trace != NULL)
{
STRING_delete(sub_trace);
}
}
}
}
return result;
}
BUFFER_HANDLE mqtt_codec_unsubscribe(uint16_t packetId, const char** unsubscribeList, size_t count, STRING_HANDLE trace_log)
{
BUFFER_HANDLE result;
/* Codes_SRS_MQTT_CODEC_07_027: [If the parameters unsubscribeList is NULL or if count is 0 then mqtt_codec_unsubscribe shall return NULL.] */
if (unsubscribeList == NULL || count == 0)
{
result = NULL;
}
else
{
/* Codes_SRS_MQTT_CODEC_07_030: [mqtt_codec_unsubscribe shall return a BUFFER_HANDLE that represents a MQTT SUBSCRIBE message.] */
result = BUFFER_new();
if (result != NULL)
{
if (constructSubscibeTypeVariableHeader(result, packetId) != 0)
{
/* Codes_SRS_MQTT_CODEC_07_029: [If any error is encountered then mqtt_codec_unsubscribe shall return NULL.] */
BUFFER_delete(result);
result = NULL;
}
else
{
STRING_HANDLE unsub_trace = NULL;
if (trace_log != NULL)
{
unsub_trace = STRING_construct_sprintf(" | PACKET_ID: %"PRIu16, packetId);
}
/* Codes_SRS_MQTT_CODEC_07_028: [mqtt_codec_unsubscribe shall iterate through count items in the unsubscribeList.] */
if (addListItemsToUnsubscribePacket(result, unsubscribeList, count, unsub_trace) != 0)
{
/* Codes_SRS_MQTT_CODEC_07_029: [If any error is encountered then mqtt_codec_unsubscribe shall return NULL.] */
BUFFER_delete(result);
result = NULL;
}
else
{
if (trace_log != NULL)
{
(void)STRING_copy(trace_log, "UNSUBSCRIBE");
}
if (constructFixedHeader(result, UNSUBSCRIBE_TYPE, UNSUBSCRIBE_FIXED_HEADER_FLAG) != 0)
{
/* Codes_SRS_MQTT_CODEC_07_029: [If any error is encountered then mqtt_codec_unsubscribe shall return NULL.] */
BUFFER_delete(result);
result = NULL;
}
else
{
if (trace_log != NULL)
{
(void)STRING_concat_with_STRING(trace_log, unsub_trace);
}
}
}
if (unsub_trace != NULL)
{
STRING_delete(unsub_trace);
}
}
}
}
return result;
}
int mqtt_codec_bytesReceived(MQTTCODEC_HANDLE handle, const unsigned char* buffer, size_t size)
{
int result;
MQTTCODEC_INSTANCE* codec_Data = (MQTTCODEC_INSTANCE*)handle;
/* Codes_SRS_MQTT_CODEC_07_031: [If the parameters handle or buffer is NULL then mqtt_codec_bytesReceived shall return a non-zero value.] */
if (codec_Data == NULL)
{
result = MU_FAILURE;
}
/* Codes_SRS_MQTT_CODEC_07_031: [If the parameters handle or buffer is NULL then mqtt_codec_bytesReceived shall return a non-zero value.] */
/* Codes_SRS_MQTT_CODEC_07_032: [If the parameters size is zero then mqtt_codec_bytesReceived shall return a non-zero value.] */
else if (buffer == NULL || size == 0)
{
codec_Data->currPacket = PACKET_TYPE_ERROR;
result = MU_FAILURE;
}
else
{
/* Codes_SRS_MQTT_CODEC_07_033: [mqtt_codec_bytesReceived constructs a sequence of bytes into the corresponding MQTT packets and on success returns zero.] */
result = 0;
size_t index = 0;
for (index = 0; index < size && result == 0; index++)
{
uint8_t iterator = ((int8_t*)buffer)[index];
if (codec_Data->codecState == CODEC_STATE_FIXED_HEADER)
{
if (codec_Data->currPacket == UNKNOWN_TYPE)
{
codec_Data->currPacket = processControlPacketType(iterator, &codec_Data->headerFlags);
// validate packet type and invalid reserved header flags
switch (codec_Data->currPacket)
{
case PACKET_INVALID1_TYPE:
case PACKET_INVALID2_TYPE:
codec_Data->currPacket = PACKET_TYPE_ERROR;
result = MU_FAILURE;
break;
case CONNECT_TYPE:
case CONNACK_TYPE:
case PUBACK_TYPE:
case PUBREC_TYPE:
case PUBCOMP_TYPE:
case SUBACK_TYPE:
case UNSUBACK_TYPE:
case PINGREQ_TYPE:
case PINGRESP_TYPE:
case DISCONNECT_TYPE:
if (codec_Data->headerFlags & 0x0F) // flags must be all zeros
{
codec_Data->currPacket = PACKET_TYPE_ERROR;
result = MU_FAILURE;
}
break;
case PUBREL_TYPE:
case SUBSCRIBE_TYPE:
case UNSUBSCRIBE_TYPE:
if ((codec_Data->headerFlags & 0x0F) != 0x02) // only bit 1 must be set
{
codec_Data->currPacket = PACKET_TYPE_ERROR;
result = MU_FAILURE;
}
break;
case PUBLISH_TYPE:
case CONTROL_PACKET_TYPE_INVALID:
case PACKET_TYPE_ERROR:
case UNKNOWN_TYPE:
break;
}
}
else
{
if (prepareheaderDataInfo(codec_Data, iterator) != 0)
{
/* Codes_SRS_MQTT_CODEC_07_035: [If any error is encountered then the packet state will be marked as error and mqtt_codec_bytesReceived shall return a non-zero value.] */
codec_Data->currPacket = PACKET_TYPE_ERROR;
result = MU_FAILURE;
}
else if (codec_Data->currPacket == PINGRESP_TYPE)
{
// PINGRESP must not have a payload
if (((int8_t*)buffer)[index] == 0)
{
/* Codes_SRS_MQTT_CODEC_07_034: [Upon a constructing a complete MQTT packet mqtt_codec_bytesReceived shall call the ON_PACKET_COMPLETE_CALLBACK function.] */
completePacketData(codec_Data);
}
else
{
codec_Data->currPacket = PACKET_TYPE_ERROR;
result = MU_FAILURE;
}
}
}
}
else if (codec_Data->codecState == CODEC_STATE_VAR_HEADER)
{
if (codec_Data->headerData == NULL)
{
codec_Data->codecState = CODEC_STATE_PAYLOAD;
}
else
{
uint8_t* dataBytes = BUFFER_u_char(codec_Data->headerData);
if (dataBytes == NULL)
{
/* Codes_SRS_MQTT_CODEC_07_035: [If any error is encountered then the packet state will be marked as error and mqtt_codec_bytesReceived shall return a non-zero value.] */
codec_Data->currPacket = PACKET_TYPE_ERROR;
result = MU_FAILURE;
}
else
{
// Increment the data
dataBytes += codec_Data->bufferOffset++;
*dataBytes = iterator;
size_t totalLen = BUFFER_length(codec_Data->headerData);
if (codec_Data->bufferOffset >= totalLen)
{
/* Codes_SRS_MQTT_CODEC_07_034: [Upon a constructing a complete MQTT packet mqtt_codec_bytesReceived shall call the ON_PACKET_COMPLETE_CALLBACK function.] */
completePacketData(codec_Data);
}
}
}
}
else
{
/* Codes_SRS_MQTT_CODEC_07_035: [If any error is encountered then the packet state will be marked as error and mqtt_codec_bytesReceived shall return a non-zero value.] */
codec_Data->currPacket = PACKET_TYPE_ERROR;
result = MU_FAILURE;
}
}
}
return result;
}