static void recvCompleteCallback()

in src/mqtt_client.c [777:1011]


static void recvCompleteCallback(void* context, CONTROL_PACKET_TYPE packet, int flags, BUFFER_HANDLE headerData)
{
    MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
    if (mqtt_client != NULL)
    {
        size_t packetLength = 0;
        uint8_t* iterator = NULL;
        if (headerData != NULL)
        {
            packetLength = BUFFER_length(headerData);
            iterator = BUFFER_u_char(headerData);
        }

#ifdef ENABLE_RAW_TRACE
        logIncomingRawTrace(mqtt_client, packet, (uint8_t)flags, iterator, packetLength);
#endif
        if ((iterator != NULL && packetLength > 0) || packet == PINGRESP_TYPE)
        {
            switch (packet)
            {
                case CONNACK_TYPE:
                {
                    /*Codes_SRS_MQTT_CLIENT_07_028: [If the actionResult parameter is of type CONNECT_ACK then the msgInfo value shall be a CONNECT_ACK structure.]*/
                    CONNECT_ACK connack = { 0 };
                    if (packetLength != 2) // CONNACK payload must be only 2 bytes
                    {
                        LogError("Invalid CONNACK packet.");
                        set_error_callback(mqtt_client, MQTT_CLIENT_COMMUNICATION_ERROR);
                        break;
                    }

                    uint8_t connect_acknowledge_flags = byteutil_readByte(&iterator);
                    if (connect_acknowledge_flags & 0xFE) // bits 7-1 must be zero
                    {
                        LogError("Invalid CONNACK packet.");
                        set_error_callback(mqtt_client, MQTT_CLIENT_COMMUNICATION_ERROR);
                        break;
                    }

                    connack.isSessionPresent = (connect_acknowledge_flags == 0x1) ? true : false;
                    uint8_t rc = byteutil_readByte(&iterator);
                    connack.returnCode =
                        (rc < ((uint8_t)CONN_REFUSED_UNKNOWN)) ?
                        (CONNECT_RETURN_CODE)rc : CONN_REFUSED_UNKNOWN;

#ifndef NO_LOGGING
                    if (is_trace_enabled(mqtt_client))
                    {
                        STRING_HANDLE trace_log = STRING_construct_sprintf("CONNACK | SESSION_PRESENT: %s | RETURN_CODE: 0x%x", connack.isSessionPresent ? TRUE_CONST : FALSE_CONST, connack.returnCode);
                        log_incoming_trace(mqtt_client, trace_log);
                        STRING_delete(trace_log);
                    }
#endif
                    mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_CONNACK, (void*)&connack, mqtt_client->ctx);

                    if (connack.returnCode == CONNECTION_ACCEPTED)
                    {
                        mqtt_client->mqtt_status |= MQTT_STATUS_CLIENT_CONNECTED;
                    }
                    break;
                }
                case PUBLISH_TYPE:
                {
                    ProcessPublishMessage(mqtt_client, iterator, packetLength, flags);
                    break;
                }
                case PUBACK_TYPE:
                case PUBREC_TYPE:
                case PUBREL_TYPE:
                case PUBCOMP_TYPE:
                {
                    if (packetLength != 2) // PUBXXX payload must be only 2 bytes
                    {
                        LogError("Invalid packet length.");
                        set_error_callback(mqtt_client, MQTT_CLIENT_COMMUNICATION_ERROR);
                        break;
                    }

                    /*Codes_SRS_MQTT_CLIENT_07_029: [If the actionResult parameter are of types PUBACK_TYPE, PUBREC_TYPE, PUBREL_TYPE or PUBCOMP_TYPE then the msgInfo value shall be a PUBLISH_ACK structure.]*/
                    MQTT_CLIENT_EVENT_RESULT action = (packet == PUBACK_TYPE) ? MQTT_CLIENT_ON_PUBLISH_ACK :
                        (packet == PUBREC_TYPE) ? MQTT_CLIENT_ON_PUBLISH_RECV :
                        (packet == PUBREL_TYPE) ? MQTT_CLIENT_ON_PUBLISH_REL : MQTT_CLIENT_ON_PUBLISH_COMP;

                    PUBLISH_ACK publish_ack = { 0 };
                    publish_ack.packetId = byteutil_read_uint16(&iterator, packetLength);

#ifndef NO_LOGGING
                    if (is_trace_enabled(mqtt_client))
                    {
                        STRING_HANDLE trace_log = STRING_construct_sprintf("%s | PACKET_ID: %"PRIu16, packet == PUBACK_TYPE ? "PUBACK" : (packet == PUBREC_TYPE) ? "PUBREC" : (packet == PUBREL_TYPE) ? "PUBREL" : "PUBCOMP",
                            publish_ack.packetId);

                        log_incoming_trace(mqtt_client, trace_log);
                        STRING_delete(trace_log);
                    }
#endif
                    BUFFER_HANDLE pubRel = NULL;
                    mqtt_client->fnOperationCallback(mqtt_client, action, (void*)&publish_ack, mqtt_client->ctx);
                    if (packet == PUBREC_TYPE)
                    {
                        pubRel = mqtt_codec_publishRelease(publish_ack.packetId);
                        if (pubRel == NULL)
                        {
                            LogError("Failed to allocate publish release message.");
                            set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
                        }
                    }
                    else if (packet == PUBREL_TYPE)
                    {
                        pubRel = mqtt_codec_publishComplete(publish_ack.packetId);
                        if (pubRel == NULL)
                        {
                            LogError("Failed to allocate publish complete message.");
                            set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
                        }
                    }
                    if (pubRel != NULL)
                    {
                        size_t size = BUFFER_length(pubRel);
                        if (sendPacketItem(mqtt_client, BUFFER_u_char(pubRel), size) != 0)
                        {
                            LogError("Failed sending publish reply.");
                            set_error_callback(mqtt_client, MQTT_CLIENT_COMMUNICATION_ERROR);
                        }
                        BUFFER_delete(pubRel);
                    }
                    break;
                }
                case SUBACK_TYPE:
                {

                    /*Codes_SRS_MQTT_CLIENT_07_030: [If the actionResult parameter is of type SUBACK_TYPE then the msgInfo value shall be a SUBSCRIBE_ACK structure.]*/
                    SUBSCRIBE_ACK suback = { 0 };

                    size_t remainLen = packetLength;
                    suback.packetId = byteutil_read_uint16(&iterator, packetLength);
                    remainLen -= 2;

#ifndef NO_LOGGING
                    STRING_HANDLE trace_log = NULL;
                    if (is_trace_enabled(mqtt_client))
                    {
                        trace_log = STRING_construct_sprintf("SUBACK | PACKET_ID: %"PRIu16, suback.packetId);
                    }
#endif
                    // Allocate the remaining len
                    size_t malloc_size = safe_multiply_size_t(sizeof(QOS_VALUE), remainLen);
                    if (malloc_size != SIZE_MAX && 
                        (suback.qosReturn = (QOS_VALUE*)malloc(malloc_size)) != NULL)
                    {
                        while (remainLen > 0)
                        {
                            uint8_t qosRet = byteutil_readByte(&iterator);
                            if (qosRet & 0x7C) // SUBACK QOS bits 6-2 must be zero
                            {
                                LogError("Invalid SUBACK_TYPE packet.");
                                set_error_callback(mqtt_client, MQTT_CLIENT_COMMUNICATION_ERROR);
                                break;
                            }
                            suback.qosReturn[suback.qosCount++] =
                                (qosRet <= ((uint8_t)DELIVER_EXACTLY_ONCE)) ?
                                (QOS_VALUE)qosRet :  DELIVER_FAILURE;
                            remainLen--;
#ifndef NO_LOGGING
                            if (is_trace_enabled(mqtt_client))
                            {
                                STRING_sprintf(trace_log, " | RETURN_CODE: %"PRIu16, suback.qosReturn[suback.qosCount-1]);
                            }
#endif
                        }

#ifndef NO_LOGGING
                        if (is_trace_enabled(mqtt_client))
                        {
                            log_incoming_trace(mqtt_client, trace_log);
                            STRING_delete(trace_log);
                        }
#endif
                        mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_SUBSCRIBE_ACK, (void*)&suback, mqtt_client->ctx);
                        free(suback.qosReturn);
                    }
                    else
                    {
                        LogError("allocation of quality of service value failed, size:%zu", malloc_size);
                        set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
                    }
                    break;
                }
                case UNSUBACK_TYPE:
                {
                    /*Codes_SRS_MQTT_CLIENT_07_031: [If the actionResult parameter is of type UNSUBACK_TYPE then the msgInfo value shall be a UNSUBSCRIBE_ACK structure.]*/
                    UNSUBSCRIBE_ACK unsuback = { 0 };
                    if (packetLength != 2) // UNSUBACK_TYPE payload must be only 2 bytes
                    {
                        LogError("Invalid UNSUBACK packet length.");
                        set_error_callback(mqtt_client, MQTT_CLIENT_COMMUNICATION_ERROR);
                        break;
                    }

                    unsuback.packetId = byteutil_read_uint16(&iterator, packetLength);

#ifndef NO_LOGGING
                    if (is_trace_enabled(mqtt_client))
                    {
                        STRING_HANDLE trace_log = STRING_construct_sprintf("UNSUBACK | PACKET_ID: %"PRIu16, unsuback.packetId);
                        log_incoming_trace(mqtt_client, trace_log);
                        STRING_delete(trace_log);
                    }
#endif
                    mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_UNSUBSCRIBE_ACK, (void*)&unsuback, mqtt_client->ctx);
                    break;
                }
                case PINGRESP_TYPE:
                    mqtt_client->timeSincePing = 0;
#ifndef NO_LOGGING
                    if (is_trace_enabled(mqtt_client))
                    {
                        STRING_HANDLE trace_log = STRING_construct_sprintf("PINGRESP");
                        log_incoming_trace(mqtt_client, trace_log);
                        STRING_delete(trace_log);
                    }
#endif
                    // Forward ping response to operation callback
                    mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_PING_RESPONSE, NULL, mqtt_client->ctx);
                    break;
                default:
                    break;
            }
        }
    }
    else
    {
        LogError("recvCompleteCallback context failed.");
    }
}