in source/demo-tasks/simple_sub_pub_demo.c [435:555]
static void prvSimpleSubscribePublishTask( void * pvParameters )
{
extern UBaseType_t uxRand( void );
MQTTPublishInfo_t xPublishInfo = { 0UL };
char payloadBuf[ mqttexampleSTRING_BUFFER_LENGTH ];
char taskName[ mqttexampleSTRING_BUFFER_LENGTH ];
MQTTAgentCommandContext_t xCommandContext;
uint32_t ulNotification = 0U, ulValueToNotify = 0UL;
MQTTStatus_t xCommandAdded;
uint32_t ulTaskNumber = ( uint32_t ) pvParameters;
MQTTQoS_t xQoS;
TickType_t xTicksToDelay;
MQTTAgentCommandInfo_t xCommandParams = { 0UL };
char * pcTopicBuffer = topicBuf[ ulTaskNumber ];
static volatile uint32_t * ulPassCounts[] = { ulQoS0PassCount, ulQoS1PassCount };
static volatile uint32_t * ulFailCounts[] = { ulQoS0FailCount, ulQoS1FailCount };
/* Have different tasks use different QoS. 0 and 1. 2 can also be used
* if supported by the broker. */
xQoS = ( MQTTQoS_t ) ( ulTaskNumber % 2UL );
/* Create a unique name for this task from the task number that is passed into
* the task using the task's parameter. */
snprintf( taskName, mqttexampleSTRING_BUFFER_LENGTH, "Publisher%d", ( int ) ulTaskNumber );
/* Create a topic name for this task to publish to. */
snprintf( pcTopicBuffer, mqttexampleSTRING_BUFFER_LENGTH, "/filter/%s", taskName );
/* Subscribe to the same topic to which this task will publish. That will
* result in each published message being published from the server back to
* the target. */
prvSubscribeToTopic( xQoS, pcTopicBuffer );
/* Configure the publish operation. */
memset( ( void * ) &xPublishInfo, 0x00, sizeof( xPublishInfo ) );
xPublishInfo.qos = xQoS;
xPublishInfo.pTopicName = pcTopicBuffer;
xPublishInfo.topicNameLength = ( uint16_t ) strlen( pcTopicBuffer );
xPublishInfo.pPayload = payloadBuf;
/* Store the handler to this task in the command context so the callback
* that executes when the command is acknowledged can send a notification
* back to this task. */
memset( ( void * ) &xCommandContext, 0x00, sizeof( xCommandContext ) );
xCommandContext.xTaskToNotify = xTaskGetCurrentTaskHandle();
xCommandParams.blockTimeMs = mqttexampleMAX_COMMAND_SEND_BLOCK_TIME_MS;
xCommandParams.cmdCompleteCallback = prvPublishCommandCallback;
xCommandParams.pCmdCompleteCallbackContext = &xCommandContext;
/* For a finite number of publishes... */
for( ulValueToNotify = 0UL; ulValueToNotify < mqttexamplePUBLISH_COUNT; ulValueToNotify++ )
{
/* Create a payload to send with the publish message. This contains
* the task name and an incrementing number. */
snprintf( payloadBuf,
mqttexampleSTRING_BUFFER_LENGTH,
"%s publishing message %d",
taskName,
( int ) ulValueToNotify );
xPublishInfo.payloadLength = ( uint16_t ) strlen( payloadBuf );
/* Also store the incrementing number in the command context so it can
* be accessed by the callback that executes when the publish operation
* is acknowledged. */
xCommandContext.ulNotificationValue = ulValueToNotify;
LogInfo( ( "Sending publish request to agent with message \"%s\" on topic \"%s\"",
payloadBuf,
pcTopicBuffer ) );
/* To ensure ulNotification doesn't accidentally hold the expected value
* as it is to be checked against the value sent from the callback.. */
ulNotification = ~ulValueToNotify;
xCommandAdded = MQTTAgent_Publish( &xGlobalMqttAgentContext,
&xPublishInfo,
&xCommandParams );
configASSERT( xCommandAdded == MQTTSuccess );
/* For QoS 1 and 2, wait for the publish acknowledgment. For QoS0,
* wait for the publish to be sent. */
LogInfo( ( "Task %s waiting for publish %d to complete.",
taskName,
ulValueToNotify ) );
prvWaitForCommandAcknowledgment( &ulNotification );
/* The value received by the callback that executed when the publish was
* acked came from the context passed into MQTTAgent_Publish() above, so
* should match the value set in the context above. */
if( ulNotification == ulValueToNotify )
{
( ulPassCounts[ xQoS ][ ulTaskNumber >> 1 ] )++;
LogInfo( ( "Rx'ed %s from Tx to %s (P%d:F%d).",
( xQoS == 0 ) ? "completion notification for QoS0 publish" : "ack for QoS1 publish",
pcTopicBuffer,
ulPassCounts[ xQoS ][ ulTaskNumber >> 1 ],
ulFailCounts[ xQoS ][ ulTaskNumber >> 1 ] ) );
}
else
{
( ulFailCounts[ xQoS ][ ulTaskNumber >> 1 ] )++;
LogError( ( "Timed out Rx'ing %s from Tx to %s (P%d:F%d)",
( xQoS == 0 ) ? "completion notification for QoS0 publish" : "ack for QoS1 publish",
pcTopicBuffer,
ulPassCounts[ xQoS ][ ulTaskNumber >> 1 ],
ulFailCounts[ xQoS ][ ulTaskNumber >> 1 ] ) );
}
/* Add a little randomness into the delay so the tasks don't remain
* in lockstep. */
xTicksToDelay = pdMS_TO_TICKS( mqttexampleDELAY_BETWEEN_PUBLISH_OPERATIONS_MS ) +
( uxRand() % 0xff );
vTaskDelay( xTicksToDelay );
}
/* Delete the task if it is complete. */
LogInfo( ( "Task %s completed.", taskName ) );
vTaskDelete( NULL );
}