in source/MutualAuthMQTTExample.c [469:590]
void RunMQTTTask( void * pvParameters )
{
uint32_t ulPublishCount = 0U, ulTopicCount = 0U;
const uint32_t ulMaxPublishCount = 5UL;
NetworkContext_t xNetworkContext = { 0 };
NetworkCredentials_t xNetworkCredentials = { 0 };
MQTTContext_t xMQTTContext = { 0 };
MQTTStatus_t xMQTTStatus;
TlsTransportStatus_t xNetworkStatus;
/* Remove compiler warnings about unused parameters. */
( void ) pvParameters;
/* Set the entry time of the demo application. This entry time will be used
* to calculate relative time elapsed in the execution of the demo application,
* by the timer utility function that is provided to the MQTT library.
*/
ulGlobalEntryTimeMs = prvGetTimeMs();
#ifdef USE_1NCE_ZERO_TOUCH_PROVISIONING
uint8_t status = nce_onboard( &pThingName,
&pEndpoint,
&pExampleTopic,
&pRootCA,
&pClientCert,
&pPrvKey );
configASSERT( status == EXIT_SUCCESS );
#else
pThingName = democonfigCLIENT_IDENTIFIER;
pEndpoint = democonfigMQTT_BROKER_ENDPOINT;
pExampleTopic = mqttexampleTOPIC;
#endif /* ifdef USE_1NCE_ZERO_TOUCH_PROVISIONING */
for( ulTopicCount = 0; ulTopicCount < mqttexampleTOPIC_COUNT; ulTopicCount++ )
{
xTopicFilterContext[ ulTopicCount ].pcTopicFilter = pExampleTopic;
}
for( ; ; )
{
/****************************** Connect. ******************************/
/* Attempt to establish TLS session with MQTT broker. If connection fails,
* retry after a timeout. Timeout value will be exponentially increased
* until the maximum number of attempts are reached or the maximum timeout
* value is reached. The function returns a failure status if the TCP
* connection cannot be established to the broker after the configured
* number of attempts. */
xNetworkStatus = prvConnectToServerWithBackoffRetries( &xNetworkCredentials,
&xNetworkContext );
configASSERT( xNetworkStatus == TLS_TRANSPORT_SUCCESS );
/* Sends an MQTT Connect packet over the already established TLS connection,
* and waits for connection acknowledgment (CONNACK) packet. */
LogInfo( ( "Creating an MQTT connection to %s.\r\n", pEndpoint ) );
prvCreateMQTTConnectionWithBroker( &xMQTTContext, &xNetworkContext );
/**************************** Subscribe. ******************************/
/* If server rejected the subscription request, attempt to resubscribe to
* topic. Attempts are made according to the exponential backoff retry
* strategy implemented in BackoffAlgorithm. */
prvMQTTSubscribeWithBackoffRetries( &xMQTTContext );
/****************** Publish and Keep Alive Loop. **********************/
/* Publish messages with QoS1, send and process Keep alive messages. */
for( ulPublishCount = 0; ulPublishCount < ulMaxPublishCount; ulPublishCount++ )
{
LogInfo( ( "Publish to the MQTT topic %s.\r\n", pExampleTopic ) );
prvMQTTPublishToTopic( &xMQTTContext );
/* Process incoming publish echo, since application subscribed to the
* same topic, the broker will send publish message back to the
* application. */
LogInfo( ( "Attempt to receive publish message from broker.\r\n" ) );
xMQTTStatus = MQTT_ProcessLoop( &xMQTTContext, mqttexamplePROCESS_LOOP_TIMEOUT_MS );
configASSERT( xMQTTStatus == MQTTSuccess );
/* Leave Connection Idle for some time. */
LogInfo( ( "Keeping Connection Idle...\r\n\r\n" ) );
vTaskDelay( mqttexampleDELAY_BETWEEN_PUBLISHES_TICKS );
}
/******************** Unsubscribe from the topic. *********************/
LogInfo( ( "Unsubscribe from the MQTT topic %s.\r\n", pExampleTopic ) );
prvMQTTUnsubscribeFromTopic( &xMQTTContext );
/* Process incoming UNSUBACK packet from the broker. */
xMQTTStatus = MQTT_ProcessLoop( &xMQTTContext, mqttexamplePROCESS_LOOP_TIMEOUT_MS );
configASSERT( xMQTTStatus == MQTTSuccess );
/**************************** Disconnect. *****************************/
/* Send an MQTT Disconnect packet over the already connected TLS over
* TCP connection. There is no corresponding response for the disconnect
* packet. After sending disconnect, client must close the network
* connection. */
LogInfo( ( "Disconnecting the MQTT connection with %s.\r\n",
pEndpoint ) );
xMQTTStatus = MQTT_Disconnect( &xMQTTContext );
configASSERT( xMQTTStatus == MQTTSuccess );
/* Close the network connection. */
TLS_FreeRTOS_Disconnect( &xNetworkContext );
/* Reset SUBACK status for each topic filter after completion of
* subscription request cycle. */
for( ulTopicCount = 0; ulTopicCount < mqttexampleTOPIC_COUNT; ulTopicCount++ )
{
xTopicFilterContext[ ulTopicCount ].xSubAckStatus = MQTTSubAckFailure;
}
/* Wait for some time between two iterations to ensure that we do not
* bombard the broker. */
LogInfo( ( "RunMQTTTask() completed an iteration successfully. "
"Total free heap is %u.\r\n",
xPortGetFreeHeapSize() ) );
LogInfo( ( "Demo completed successfully.\r\n" ) );
LogInfo( ( "Short delay before starting the next iteration.... \r\n\r\n" ) );
vTaskDelay( mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS_TICKS );
}
}