static void prvMQTTAgentLoop()

in source/core_mqtt_agent.c [159:267]


static void prvMQTTAgentLoop( void * pParams )
{
    BaseType_t status;
    MQTTOperation_t * pOperation;
    uint16_t packetIdentifier = 0;
    MQTTContext_t * pMQTTContext = ( MQTTContext_t * ) pParams;
    MQTTStatus_t mqttStatus;

    isAgentRunning = pdTRUE;

    for( ; ; )
    {
        status = xQueueReceive( xOperationsQueue, &pOperation, 1 );

        if( status == pdTRUE )
        {
            switch( pOperation->type )
            {
                case MQTT_OP_RECEIVE:
                    mqttStatus = MQTT_ProcessLoop( pMQTTContext, MQTT_AGENT_MAX_POLLING_INTERVAL_MS );
                    configASSERT( mqttStatus == MQTTSuccess );
                    xQueueSend( xOperationsQueue, &pOperation, 1 );
                    break;

                case MQTT_OP_PUBLISH:

                    if( pOperation->info.pPublishInfo->qos != MQTTQoS0 )
                    {
                        packetIdentifier = MQTT_GetPacketId( pMQTTContext );
                    }
                    else
                    {
                        packetIdentifier = 0;
                    }

                    mqttStatus = MQTT_Publish( pMQTTContext, pOperation->info.pPublishInfo, packetIdentifier );

                    if( ( mqttStatus != MQTTSuccess ) || ( pOperation->info.pPublishInfo->qos == MQTTQoS0 ) )
                    {
                        pOperation->callback( pOperation, mqttStatus );
                    }
                    else
                    {
                        pOperation->packetIdentifier = packetIdentifier;
                        configASSERT( addPendingOperation( pOperation ) == pdTRUE );
                    }

                    break;

                case MQTT_OP_SUBSCRIBE:
                    packetIdentifier = MQTT_GetPacketId( pMQTTContext );
                    mqttStatus = MQTT_Subscribe( pMQTTContext,
                                                 pOperation->info.subscriptionInfo.pSubscriptionList,
                                                 pOperation->info.subscriptionInfo.numSubscriptions,
                                                 packetIdentifier );

                    if( mqttStatus != MQTTSuccess )
                    {
                        pOperation->callback( pOperation, mqttStatus );
                    }
                    else
                    {
                        pOperation->packetIdentifier = packetIdentifier;
                        configASSERT( addPendingOperation( pOperation ) == pdTRUE );
                    }

                    break;

                case MQTT_OP_UNSUBSCRIBE:
                    packetIdentifier = MQTT_GetPacketId( pMQTTContext );
                    mqttStatus = MQTT_Unsubscribe( pMQTTContext,
                                                   pOperation->info.subscriptionInfo.pSubscriptionList,
                                                   pOperation->info.subscriptionInfo.numSubscriptions,
                                                   packetIdentifier );

                    if( mqttStatus != MQTTSuccess )
                    {
                        pOperation->callback( pOperation, mqttStatus );
                    }
                    else
                    {
                        pOperation->packetIdentifier = packetIdentifier;
                        configASSERT( addPendingOperation( pOperation ) == pdTRUE );
                    }

                    break;

                case MQTT_OP_STOP:
                    /* Reset the operations queue to empty state to stop the agent. */
                    xQueueReset( xOperationsQueue );
                    pOperation->callback( pOperation, MQTTSuccess );
                    break;

                default:
                    break;
            }
        }
        else
        {
            break;
        }
    }

    vQueueDelete( xOperationsQueue );

    isAgentRunning = pdFALSE;

    vTaskDelete( NULL );
}