static void prvSimpleSubscribePublishTask()

in source/demo-tasks/simple_sub_pub_demo.c [435:555]


static void prvSimpleSubscribePublishTask( void * pvParameters )
{
    extern UBaseType_t uxRand( void );
    MQTTPublishInfo_t xPublishInfo = { 0UL };
    char payloadBuf[ mqttexampleSTRING_BUFFER_LENGTH ];
    char taskName[ mqttexampleSTRING_BUFFER_LENGTH ];
    MQTTAgentCommandContext_t xCommandContext;
    uint32_t ulNotification = 0U, ulValueToNotify = 0UL;
    MQTTStatus_t xCommandAdded;
    uint32_t ulTaskNumber = ( uint32_t ) pvParameters;
    MQTTQoS_t xQoS;
    TickType_t xTicksToDelay;
    MQTTAgentCommandInfo_t xCommandParams = { 0UL };
    char * pcTopicBuffer = topicBuf[ ulTaskNumber ];
    static volatile uint32_t * ulPassCounts[] = { ulQoS0PassCount, ulQoS1PassCount };
    static volatile uint32_t * ulFailCounts[] = { ulQoS0FailCount, ulQoS1FailCount };

    /* Have different tasks use different QoS.  0 and 1.  2 can also be used
     * if supported by the broker. */
    xQoS = ( MQTTQoS_t ) ( ulTaskNumber % 2UL );

    /* Create a unique name for this task from the task number that is passed into
     * the task using the task's parameter. */
    snprintf( taskName, mqttexampleSTRING_BUFFER_LENGTH, "Publisher%d", ( int ) ulTaskNumber );

    /* Create a topic name for this task to publish to. */
    snprintf( pcTopicBuffer, mqttexampleSTRING_BUFFER_LENGTH, "/filter/%s", taskName );

    /* Subscribe to the same topic to which this task will publish.  That will
     * result in each published message being published from the server back to
     * the target. */
    prvSubscribeToTopic( xQoS, pcTopicBuffer );

    /* Configure the publish operation. */
    memset( ( void * ) &xPublishInfo, 0x00, sizeof( xPublishInfo ) );
    xPublishInfo.qos = xQoS;
    xPublishInfo.pTopicName = pcTopicBuffer;
    xPublishInfo.topicNameLength = ( uint16_t ) strlen( pcTopicBuffer );
    xPublishInfo.pPayload = payloadBuf;

    /* Store the handler to this task in the command context so the callback
     * that executes when the command is acknowledged can send a notification
     * back to this task. */
    memset( ( void * ) &xCommandContext, 0x00, sizeof( xCommandContext ) );
    xCommandContext.xTaskToNotify = xTaskGetCurrentTaskHandle();

    xCommandParams.blockTimeMs = mqttexampleMAX_COMMAND_SEND_BLOCK_TIME_MS;
    xCommandParams.cmdCompleteCallback = prvPublishCommandCallback;
    xCommandParams.pCmdCompleteCallbackContext = &xCommandContext;

    /* For a finite number of publishes... */
    for( ulValueToNotify = 0UL; ulValueToNotify < mqttexamplePUBLISH_COUNT; ulValueToNotify++ )
    {
        /* Create a payload to send with the publish message.  This contains
         * the task name and an incrementing number. */
        snprintf( payloadBuf,
                  mqttexampleSTRING_BUFFER_LENGTH,
                  "%s publishing message %d",
                  taskName,
                  ( int ) ulValueToNotify );

        xPublishInfo.payloadLength = ( uint16_t ) strlen( payloadBuf );

        /* Also store the incrementing number in the command context so it can
         * be accessed by the callback that executes when the publish operation
         * is acknowledged. */
        xCommandContext.ulNotificationValue = ulValueToNotify;

        LogInfo( ( "Sending publish request to agent with message \"%s\" on topic \"%s\"",
                   payloadBuf,
                   pcTopicBuffer ) );

        /* To ensure ulNotification doesn't accidentally hold the expected value
         * as it is to be checked against the value sent from the callback.. */
        ulNotification = ~ulValueToNotify;

        xCommandAdded = MQTTAgent_Publish( &xGlobalMqttAgentContext,
                                           &xPublishInfo,
                                           &xCommandParams );
        configASSERT( xCommandAdded == MQTTSuccess );

        /* For QoS 1 and 2, wait for the publish acknowledgment.  For QoS0,
         * wait for the publish to be sent. */
        LogInfo( ( "Task %s waiting for publish %d to complete.",
                   taskName,
                   ulValueToNotify ) );
        prvWaitForCommandAcknowledgment( &ulNotification );

        /* The value received by the callback that executed when the publish was
         * acked came from the context passed into MQTTAgent_Publish() above, so
         * should match the value set in the context above. */
        if( ulNotification == ulValueToNotify )
        {
            ( ulPassCounts[ xQoS ][ ulTaskNumber >> 1 ] )++;
            LogInfo( ( "Rx'ed %s from Tx to %s (P%d:F%d).",
                       ( xQoS == 0 ) ? "completion notification for QoS0 publish" : "ack for QoS1 publish",
                       pcTopicBuffer,
                       ulPassCounts[ xQoS ][ ulTaskNumber >> 1 ],
                       ulFailCounts[ xQoS ][ ulTaskNumber >> 1 ] ) );
        }
        else
        {
            ( ulFailCounts[ xQoS ][ ulTaskNumber >> 1 ] )++;
            LogError( ( "Timed out Rx'ing %s from Tx to %s (P%d:F%d)",
                        ( xQoS == 0 ) ? "completion notification for QoS0 publish" : "ack for QoS1 publish",
                        pcTopicBuffer,
                        ulPassCounts[ xQoS ][ ulTaskNumber >> 1 ],
                        ulFailCounts[ xQoS ][ ulTaskNumber >> 1 ] ) );
        }

        /* Add a little randomness into the delay so the tasks don't remain
         * in lockstep. */
        xTicksToDelay = pdMS_TO_TICKS( mqttexampleDELAY_BETWEEN_PUBLISH_OPERATIONS_MS ) +
                        ( uxRand() % 0xff );
        vTaskDelay( xTicksToDelay );
    }

    /* Delete the task if it is complete. */
    LogInfo( ( "Task %s completed.", taskName ) );
    vTaskDelete( NULL );
}