in src/mqtt_client.c [777:1011]
static void recvCompleteCallback(void* context, CONTROL_PACKET_TYPE packet, int flags, BUFFER_HANDLE headerData)
{
MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
if (mqtt_client != NULL)
{
size_t packetLength = 0;
uint8_t* iterator = NULL;
if (headerData != NULL)
{
packetLength = BUFFER_length(headerData);
iterator = BUFFER_u_char(headerData);
}
#ifdef ENABLE_RAW_TRACE
logIncomingRawTrace(mqtt_client, packet, (uint8_t)flags, iterator, packetLength);
#endif
if ((iterator != NULL && packetLength > 0) || packet == PINGRESP_TYPE)
{
switch (packet)
{
case CONNACK_TYPE:
{
/*Codes_SRS_MQTT_CLIENT_07_028: [If the actionResult parameter is of type CONNECT_ACK then the msgInfo value shall be a CONNECT_ACK structure.]*/
CONNECT_ACK connack = { 0 };
if (packetLength != 2) // CONNACK payload must be only 2 bytes
{
LogError("Invalid CONNACK packet.");
set_error_callback(mqtt_client, MQTT_CLIENT_COMMUNICATION_ERROR);
break;
}
uint8_t connect_acknowledge_flags = byteutil_readByte(&iterator);
if (connect_acknowledge_flags & 0xFE) // bits 7-1 must be zero
{
LogError("Invalid CONNACK packet.");
set_error_callback(mqtt_client, MQTT_CLIENT_COMMUNICATION_ERROR);
break;
}
connack.isSessionPresent = (connect_acknowledge_flags == 0x1) ? true : false;
uint8_t rc = byteutil_readByte(&iterator);
connack.returnCode =
(rc < ((uint8_t)CONN_REFUSED_UNKNOWN)) ?
(CONNECT_RETURN_CODE)rc : CONN_REFUSED_UNKNOWN;
#ifndef NO_LOGGING
if (is_trace_enabled(mqtt_client))
{
STRING_HANDLE trace_log = STRING_construct_sprintf("CONNACK | SESSION_PRESENT: %s | RETURN_CODE: 0x%x", connack.isSessionPresent ? TRUE_CONST : FALSE_CONST, connack.returnCode);
log_incoming_trace(mqtt_client, trace_log);
STRING_delete(trace_log);
}
#endif
mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_CONNACK, (void*)&connack, mqtt_client->ctx);
if (connack.returnCode == CONNECTION_ACCEPTED)
{
mqtt_client->mqtt_status |= MQTT_STATUS_CLIENT_CONNECTED;
}
break;
}
case PUBLISH_TYPE:
{
ProcessPublishMessage(mqtt_client, iterator, packetLength, flags);
break;
}
case PUBACK_TYPE:
case PUBREC_TYPE:
case PUBREL_TYPE:
case PUBCOMP_TYPE:
{
if (packetLength != 2) // PUBXXX payload must be only 2 bytes
{
LogError("Invalid packet length.");
set_error_callback(mqtt_client, MQTT_CLIENT_COMMUNICATION_ERROR);
break;
}
/*Codes_SRS_MQTT_CLIENT_07_029: [If the actionResult parameter are of types PUBACK_TYPE, PUBREC_TYPE, PUBREL_TYPE or PUBCOMP_TYPE then the msgInfo value shall be a PUBLISH_ACK structure.]*/
MQTT_CLIENT_EVENT_RESULT action = (packet == PUBACK_TYPE) ? MQTT_CLIENT_ON_PUBLISH_ACK :
(packet == PUBREC_TYPE) ? MQTT_CLIENT_ON_PUBLISH_RECV :
(packet == PUBREL_TYPE) ? MQTT_CLIENT_ON_PUBLISH_REL : MQTT_CLIENT_ON_PUBLISH_COMP;
PUBLISH_ACK publish_ack = { 0 };
publish_ack.packetId = byteutil_read_uint16(&iterator, packetLength);
#ifndef NO_LOGGING
if (is_trace_enabled(mqtt_client))
{
STRING_HANDLE trace_log = STRING_construct_sprintf("%s | PACKET_ID: %"PRIu16, packet == PUBACK_TYPE ? "PUBACK" : (packet == PUBREC_TYPE) ? "PUBREC" : (packet == PUBREL_TYPE) ? "PUBREL" : "PUBCOMP",
publish_ack.packetId);
log_incoming_trace(mqtt_client, trace_log);
STRING_delete(trace_log);
}
#endif
BUFFER_HANDLE pubRel = NULL;
mqtt_client->fnOperationCallback(mqtt_client, action, (void*)&publish_ack, mqtt_client->ctx);
if (packet == PUBREC_TYPE)
{
pubRel = mqtt_codec_publishRelease(publish_ack.packetId);
if (pubRel == NULL)
{
LogError("Failed to allocate publish release message.");
set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
}
}
else if (packet == PUBREL_TYPE)
{
pubRel = mqtt_codec_publishComplete(publish_ack.packetId);
if (pubRel == NULL)
{
LogError("Failed to allocate publish complete message.");
set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
}
}
if (pubRel != NULL)
{
size_t size = BUFFER_length(pubRel);
if (sendPacketItem(mqtt_client, BUFFER_u_char(pubRel), size) != 0)
{
LogError("Failed sending publish reply.");
set_error_callback(mqtt_client, MQTT_CLIENT_COMMUNICATION_ERROR);
}
BUFFER_delete(pubRel);
}
break;
}
case SUBACK_TYPE:
{
/*Codes_SRS_MQTT_CLIENT_07_030: [If the actionResult parameter is of type SUBACK_TYPE then the msgInfo value shall be a SUBSCRIBE_ACK structure.]*/
SUBSCRIBE_ACK suback = { 0 };
size_t remainLen = packetLength;
suback.packetId = byteutil_read_uint16(&iterator, packetLength);
remainLen -= 2;
#ifndef NO_LOGGING
STRING_HANDLE trace_log = NULL;
if (is_trace_enabled(mqtt_client))
{
trace_log = STRING_construct_sprintf("SUBACK | PACKET_ID: %"PRIu16, suback.packetId);
}
#endif
// Allocate the remaining len
size_t malloc_size = safe_multiply_size_t(sizeof(QOS_VALUE), remainLen);
if (malloc_size != SIZE_MAX &&
(suback.qosReturn = (QOS_VALUE*)malloc(malloc_size)) != NULL)
{
while (remainLen > 0)
{
uint8_t qosRet = byteutil_readByte(&iterator);
if (qosRet & 0x7C) // SUBACK QOS bits 6-2 must be zero
{
LogError("Invalid SUBACK_TYPE packet.");
set_error_callback(mqtt_client, MQTT_CLIENT_COMMUNICATION_ERROR);
break;
}
suback.qosReturn[suback.qosCount++] =
(qosRet <= ((uint8_t)DELIVER_EXACTLY_ONCE)) ?
(QOS_VALUE)qosRet : DELIVER_FAILURE;
remainLen--;
#ifndef NO_LOGGING
if (is_trace_enabled(mqtt_client))
{
STRING_sprintf(trace_log, " | RETURN_CODE: %"PRIu16, suback.qosReturn[suback.qosCount-1]);
}
#endif
}
#ifndef NO_LOGGING
if (is_trace_enabled(mqtt_client))
{
log_incoming_trace(mqtt_client, trace_log);
STRING_delete(trace_log);
}
#endif
mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_SUBSCRIBE_ACK, (void*)&suback, mqtt_client->ctx);
free(suback.qosReturn);
}
else
{
LogError("allocation of quality of service value failed, size:%zu", malloc_size);
set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
}
break;
}
case UNSUBACK_TYPE:
{
/*Codes_SRS_MQTT_CLIENT_07_031: [If the actionResult parameter is of type UNSUBACK_TYPE then the msgInfo value shall be a UNSUBSCRIBE_ACK structure.]*/
UNSUBSCRIBE_ACK unsuback = { 0 };
if (packetLength != 2) // UNSUBACK_TYPE payload must be only 2 bytes
{
LogError("Invalid UNSUBACK packet length.");
set_error_callback(mqtt_client, MQTT_CLIENT_COMMUNICATION_ERROR);
break;
}
unsuback.packetId = byteutil_read_uint16(&iterator, packetLength);
#ifndef NO_LOGGING
if (is_trace_enabled(mqtt_client))
{
STRING_HANDLE trace_log = STRING_construct_sprintf("UNSUBACK | PACKET_ID: %"PRIu16, unsuback.packetId);
log_incoming_trace(mqtt_client, trace_log);
STRING_delete(trace_log);
}
#endif
mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_UNSUBSCRIBE_ACK, (void*)&unsuback, mqtt_client->ctx);
break;
}
case PINGRESP_TYPE:
mqtt_client->timeSincePing = 0;
#ifndef NO_LOGGING
if (is_trace_enabled(mqtt_client))
{
STRING_HANDLE trace_log = STRING_construct_sprintf("PINGRESP");
log_incoming_trace(mqtt_client, trace_log);
STRING_delete(trace_log);
}
#endif
// Forward ping response to operation callback
mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_PING_RESPONSE, NULL, mqtt_client->ctx);
break;
default:
break;
}
}
}
else
{
LogError("recvCompleteCallback context failed.");
}
}