in src/mqtt_codec.c [1054:1190]
int mqtt_codec_bytesReceived(MQTTCODEC_HANDLE handle, const unsigned char* buffer, size_t size)
{
int result;
MQTTCODEC_INSTANCE* codec_Data = (MQTTCODEC_INSTANCE*)handle;
/* Codes_SRS_MQTT_CODEC_07_031: [If the parameters handle or buffer is NULL then mqtt_codec_bytesReceived shall return a non-zero value.] */
if (codec_Data == NULL)
{
result = MU_FAILURE;
}
/* Codes_SRS_MQTT_CODEC_07_031: [If the parameters handle or buffer is NULL then mqtt_codec_bytesReceived shall return a non-zero value.] */
/* Codes_SRS_MQTT_CODEC_07_032: [If the parameters size is zero then mqtt_codec_bytesReceived shall return a non-zero value.] */
else if (buffer == NULL || size == 0)
{
codec_Data->currPacket = PACKET_TYPE_ERROR;
result = MU_FAILURE;
}
else
{
/* Codes_SRS_MQTT_CODEC_07_033: [mqtt_codec_bytesReceived constructs a sequence of bytes into the corresponding MQTT packets and on success returns zero.] */
result = 0;
size_t index = 0;
for (index = 0; index < size && result == 0; index++)
{
uint8_t iterator = ((int8_t*)buffer)[index];
if (codec_Data->codecState == CODEC_STATE_FIXED_HEADER)
{
if (codec_Data->currPacket == UNKNOWN_TYPE)
{
codec_Data->currPacket = processControlPacketType(iterator, &codec_Data->headerFlags);
// validate packet type and invalid reserved header flags
switch (codec_Data->currPacket)
{
case PACKET_INVALID1_TYPE:
case PACKET_INVALID2_TYPE:
codec_Data->currPacket = PACKET_TYPE_ERROR;
result = MU_FAILURE;
break;
case CONNECT_TYPE:
case CONNACK_TYPE:
case PUBACK_TYPE:
case PUBREC_TYPE:
case PUBCOMP_TYPE:
case SUBACK_TYPE:
case UNSUBACK_TYPE:
case PINGREQ_TYPE:
case PINGRESP_TYPE:
case DISCONNECT_TYPE:
if (codec_Data->headerFlags & 0x0F) // flags must be all zeros
{
codec_Data->currPacket = PACKET_TYPE_ERROR;
result = MU_FAILURE;
}
break;
case PUBREL_TYPE:
case SUBSCRIBE_TYPE:
case UNSUBSCRIBE_TYPE:
if ((codec_Data->headerFlags & 0x0F) != 0x02) // only bit 1 must be set
{
codec_Data->currPacket = PACKET_TYPE_ERROR;
result = MU_FAILURE;
}
break;
case PUBLISH_TYPE:
case CONTROL_PACKET_TYPE_INVALID:
case PACKET_TYPE_ERROR:
case UNKNOWN_TYPE:
break;
}
}
else
{
if (prepareheaderDataInfo(codec_Data, iterator) != 0)
{
/* Codes_SRS_MQTT_CODEC_07_035: [If any error is encountered then the packet state will be marked as error and mqtt_codec_bytesReceived shall return a non-zero value.] */
codec_Data->currPacket = PACKET_TYPE_ERROR;
result = MU_FAILURE;
}
else if (codec_Data->currPacket == PINGRESP_TYPE)
{
// PINGRESP must not have a payload
if (((int8_t*)buffer)[index] == 0)
{
/* Codes_SRS_MQTT_CODEC_07_034: [Upon a constructing a complete MQTT packet mqtt_codec_bytesReceived shall call the ON_PACKET_COMPLETE_CALLBACK function.] */
completePacketData(codec_Data);
}
else
{
codec_Data->currPacket = PACKET_TYPE_ERROR;
result = MU_FAILURE;
}
}
}
}
else if (codec_Data->codecState == CODEC_STATE_VAR_HEADER)
{
if (codec_Data->headerData == NULL)
{
codec_Data->codecState = CODEC_STATE_PAYLOAD;
}
else
{
uint8_t* dataBytes = BUFFER_u_char(codec_Data->headerData);
if (dataBytes == NULL)
{
/* Codes_SRS_MQTT_CODEC_07_035: [If any error is encountered then the packet state will be marked as error and mqtt_codec_bytesReceived shall return a non-zero value.] */
codec_Data->currPacket = PACKET_TYPE_ERROR;
result = MU_FAILURE;
}
else
{
// Increment the data
dataBytes += codec_Data->bufferOffset++;
*dataBytes = iterator;
size_t totalLen = BUFFER_length(codec_Data->headerData);
if (codec_Data->bufferOffset >= totalLen)
{
/* Codes_SRS_MQTT_CODEC_07_034: [Upon a constructing a complete MQTT packet mqtt_codec_bytesReceived shall call the ON_PACKET_COMPLETE_CALLBACK function.] */
completePacketData(codec_Data);
}
}
}
}
else
{
/* Codes_SRS_MQTT_CODEC_07_035: [If any error is encountered then the packet state will be marked as error and mqtt_codec_bytesReceived shall return a non-zero value.] */
codec_Data->currPacket = PACKET_TYPE_ERROR;
result = MU_FAILURE;
}
}
}
return result;
}