in source/core_mqtt_agent.c [159:267]
static void prvMQTTAgentLoop( void * pParams )
{
BaseType_t status;
MQTTOperation_t * pOperation;
uint16_t packetIdentifier = 0;
MQTTContext_t * pMQTTContext = ( MQTTContext_t * ) pParams;
MQTTStatus_t mqttStatus;
isAgentRunning = pdTRUE;
for( ; ; )
{
status = xQueueReceive( xOperationsQueue, &pOperation, 1 );
if( status == pdTRUE )
{
switch( pOperation->type )
{
case MQTT_OP_RECEIVE:
mqttStatus = MQTT_ProcessLoop( pMQTTContext, MQTT_AGENT_MAX_POLLING_INTERVAL_MS );
configASSERT( mqttStatus == MQTTSuccess );
xQueueSend( xOperationsQueue, &pOperation, 1 );
break;
case MQTT_OP_PUBLISH:
if( pOperation->info.pPublishInfo->qos != MQTTQoS0 )
{
packetIdentifier = MQTT_GetPacketId( pMQTTContext );
}
else
{
packetIdentifier = 0;
}
mqttStatus = MQTT_Publish( pMQTTContext, pOperation->info.pPublishInfo, packetIdentifier );
if( ( mqttStatus != MQTTSuccess ) || ( pOperation->info.pPublishInfo->qos == MQTTQoS0 ) )
{
pOperation->callback( pOperation, mqttStatus );
}
else
{
pOperation->packetIdentifier = packetIdentifier;
configASSERT( addPendingOperation( pOperation ) == pdTRUE );
}
break;
case MQTT_OP_SUBSCRIBE:
packetIdentifier = MQTT_GetPacketId( pMQTTContext );
mqttStatus = MQTT_Subscribe( pMQTTContext,
pOperation->info.subscriptionInfo.pSubscriptionList,
pOperation->info.subscriptionInfo.numSubscriptions,
packetIdentifier );
if( mqttStatus != MQTTSuccess )
{
pOperation->callback( pOperation, mqttStatus );
}
else
{
pOperation->packetIdentifier = packetIdentifier;
configASSERT( addPendingOperation( pOperation ) == pdTRUE );
}
break;
case MQTT_OP_UNSUBSCRIBE:
packetIdentifier = MQTT_GetPacketId( pMQTTContext );
mqttStatus = MQTT_Unsubscribe( pMQTTContext,
pOperation->info.subscriptionInfo.pSubscriptionList,
pOperation->info.subscriptionInfo.numSubscriptions,
packetIdentifier );
if( mqttStatus != MQTTSuccess )
{
pOperation->callback( pOperation, mqttStatus );
}
else
{
pOperation->packetIdentifier = packetIdentifier;
configASSERT( addPendingOperation( pOperation ) == pdTRUE );
}
break;
case MQTT_OP_STOP:
/* Reset the operations queue to empty state to stop the agent. */
xQueueReset( xOperationsQueue );
pOperation->callback( pOperation, MQTTSuccess );
break;
default:
break;
}
}
else
{
break;
}
}
vQueueDelete( xOperationsQueue );
isAgentRunning = pdFALSE;
vTaskDelete( NULL );
}