static MQTTStatus_t resendPublishes()

in source/core_mqtt_agent.c [795:834]


static MQTTStatus_t resendPublishes( MQTTAgentContext_t * pMqttAgentContext )
{
    MQTTStatus_t statusResult = MQTTSuccess;
    MQTTStateCursor_t cursor = MQTT_STATE_CURSOR_INITIALIZER;
    uint16_t packetId = MQTT_PACKET_ID_INVALID;
    MQTTAgentAckInfo_t * pFoundAck = NULL;
    MQTTPublishInfo_t * pOriginalPublish = NULL;
    MQTTContext_t * pMqttContext;

    assert( pMqttAgentContext != NULL );
    pMqttContext = &( pMqttAgentContext->mqttContext );

    packetId = MQTT_PublishToResend( pMqttContext, &cursor );

    while( packetId != MQTT_PACKET_ID_INVALID )
    {
        /* Retrieve the operation but do not remove it from the list. */
        pFoundAck = getAwaitingOperation( pMqttAgentContext, packetId );

        if( pFoundAck != NULL )
        {
            /* Set the DUP flag. */
            pOriginalPublish = ( MQTTPublishInfo_t * ) ( pFoundAck->pOriginalCommand->pArgs );
            pOriginalPublish->dup = true;
            statusResult = MQTT_Publish( pMqttContext, pOriginalPublish, packetId );

            if( statusResult != MQTTSuccess )
            {
                concludeCommand( pMqttAgentContext, pFoundAck->pOriginalCommand, statusResult, NULL );
                ( void ) memset( pFoundAck, 0x00, sizeof( MQTTAgentAckInfo_t ) );
                LogError( ( "Failed to resend publishes. Error code=%s\n", MQTT_Status_strerror( statusResult ) ) );
                break;
            }
        }

        packetId = MQTT_PublishToResend( pMqttContext, &cursor );
    }

    return statusResult;
}