IotMqttError_t IotMqtt_Connect()

in device_firmware/libraries/c_sdk/standard/mqtt/src/iot_mqtt_api.c [885:1233]


IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,
                                const IotMqttConnectInfo_t * pConnectInfo,
                                uint32_t timeoutMs,
                                IotMqttConnection_t * const pMqttConnection )
{
    IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );
    bool networkCreated = false, ownNetworkConnection = false;
    IotNetworkError_t networkStatus = IOT_NETWORK_SUCCESS;
    IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;
    void * pNetworkConnection = NULL;
    _mqttOperation_t * pOperation = NULL;
    _mqttConnection_t * pNewMqttConnection = NULL;

    /* Default CONNECT serializer function. */
    IotMqttError_t ( * serializeConnect )( const IotMqttConnectInfo_t *,
                                           uint8_t **,
                                           size_t * ) = _IotMqtt_SerializeConnect;

    /* Network info must not be NULL. */
    if( pNetworkInfo == NULL )
    {
        IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
    }
    else
    {
        EMPTY_ELSE_MARKER;
    }

    /* Validate network interface and connect info. */
    if( _IotMqtt_ValidateConnect( pConnectInfo ) == false )
    {
        IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
    }
    else
    {
        EMPTY_ELSE_MARKER;
    }

    /* If will info is provided, check that it is valid. */
    if( pConnectInfo->pWillInfo != NULL )
    {
        if( _IotMqtt_ValidatePublish( pConnectInfo->awsIotMqttMode,
                                      pConnectInfo->pWillInfo ) == false )
        {
            IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
        }
        else if( pConnectInfo->pWillInfo->payloadLength > UINT16_MAX )
        {
            /* Will message payloads cannot be larger than 65535. This restriction
             * applies only to will messages, and not normal PUBLISH messages. */
            IotLogError( "Will payload cannot be larger than 65535." );

            IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
        }
        else
        {
            EMPTY_ELSE_MARKER;
        }
    }
    else
    {
        EMPTY_ELSE_MARKER;
    }

    /* If previous subscriptions are provided, check that they are valid. */
    if( pConnectInfo->cleanSession == false )
    {
        if( pConnectInfo->pPreviousSubscriptions != NULL )
        {
            if( _IotMqtt_ValidateSubscriptionList( IOT_MQTT_SUBSCRIBE,
                                                   pConnectInfo->awsIotMqttMode,
                                                   pConnectInfo->pPreviousSubscriptions,
                                                   pConnectInfo->previousSubscriptionCount ) == false )
            {
                IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
            }
            else
            {
                EMPTY_ELSE_MARKER;
            }
        }
        else
        {
            EMPTY_ELSE_MARKER;
        }
    }
    else
    {
        EMPTY_ELSE_MARKER;
    }

    /* Create a new MQTT connection if requested. Otherwise, copy the existing
     * network connection. */
    if( pNetworkInfo->createNetworkConnection == true )
    {
        networkStatus = pNetworkInfo->pNetworkInterface->create( pNetworkInfo->u.setup.pNetworkServerInfo,
                                                                 pNetworkInfo->u.setup.pNetworkCredentialInfo,
                                                                 &pNetworkConnection );

        if( networkStatus == IOT_NETWORK_SUCCESS )
        {
            networkCreated = true;

            /* This MQTT connection owns the network connection it created and
             * should destroy it on cleanup. */
            ownNetworkConnection = true;
        }
        else
        {
            IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NETWORK_ERROR );
        }
    }
    else
    {
        pNetworkConnection = pNetworkInfo->u.pNetworkConnection;
        networkCreated = true;
    }

    IotLogInfo( "Establishing new MQTT connection." );

    /* Initialize a new MQTT connection object. */
    pNewMqttConnection = _createMqttConnection( pConnectInfo->awsIotMqttMode,
                                                pNetworkInfo,
                                                pConnectInfo->keepAliveSeconds );

    if( pNewMqttConnection == NULL )
    {
        IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );
    }
    else
    {
        /* Set the network connection associated with the MQTT connection. */
        pNewMqttConnection->pNetworkConnection = pNetworkConnection;
        pNewMqttConnection->ownNetworkConnection = ownNetworkConnection;

        /* Set the MQTT packet serializer overrides. */
        #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
            pNewMqttConnection->pSerializer = pNetworkInfo->pMqttSerializer;
        #endif
    }

    /* Set the MQTT receive callback. */
    networkStatus = pNewMqttConnection->pNetworkInterface->setReceiveCallback( pNetworkConnection,
                                                                               IotMqtt_ReceiveCallback,
                                                                               pNewMqttConnection );

    if( networkStatus != IOT_NETWORK_SUCCESS )
    {
        IotLogError( "Failed to set MQTT network receive callback." );

        IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NETWORK_ERROR );
    }
    else
    {
        EMPTY_ELSE_MARKER;
    }

    /* Create a CONNECT operation. */
    status = _IotMqtt_CreateOperation( pNewMqttConnection,
                                       IOT_MQTT_FLAG_WAITABLE,
                                       NULL,
                                       &pOperation );

    if( status != IOT_MQTT_SUCCESS )
    {
        IOT_GOTO_CLEANUP();
    }
    else
    {
        EMPTY_ELSE_MARKER;
    }

    /* Ensure the members set by operation creation and serialization
     * are appropriate for a blocking CONNECT. */
    IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );
    IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE )
                    == IOT_MQTT_FLAG_WAITABLE );
    IotMqtt_Assert( pOperation->u.operation.retry.limit == 0 );

    /* Set the operation type. */
    pOperation->u.operation.type = IOT_MQTT_CONNECT;

    /* Add previous session subscriptions. */
    if( pConnectInfo->pPreviousSubscriptions != NULL )
    {
        /* Previous subscription count should have been validated as nonzero. */
        IotMqtt_Assert( pConnectInfo->previousSubscriptionCount > 0 );

        status = _IotMqtt_AddSubscriptions( pNewMqttConnection,
                                            2,
                                            pConnectInfo->pPreviousSubscriptions,
                                            pConnectInfo->previousSubscriptionCount );

        if( status != IOT_MQTT_SUCCESS )
        {
            IOT_GOTO_CLEANUP();
        }
        else
        {
            EMPTY_ELSE_MARKER;
        }
    }
    else
    {
        EMPTY_ELSE_MARKER;
    }

    /* Choose a CONNECT serializer function. */
    #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
        if( pNewMqttConnection->pSerializer != NULL )
        {
            if( pNewMqttConnection->pSerializer->serialize.connect != NULL )
            {
                serializeConnect = pNewMqttConnection->pSerializer->serialize.connect;
            }
            else
            {
                EMPTY_ELSE_MARKER;
            }
        }
        else
        {
            EMPTY_ELSE_MARKER;
        }
    #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */

    /* Convert the connect info and will info objects to an MQTT CONNECT packet. */
    status = serializeConnect( pConnectInfo,
                               &( pOperation->u.operation.pMqttPacket ),
                               &( pOperation->u.operation.packetSize ) );

    if( status != IOT_MQTT_SUCCESS )
    {
        IOT_GOTO_CLEANUP();
    }
    else
    {
        EMPTY_ELSE_MARKER;
    }

    /* Check the serialized MQTT packet. */
    IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );
    IotMqtt_Assert( pOperation->u.operation.packetSize > 0 );

    /* Add the CONNECT operation to the send queue for network transmission. */
    status = _IotMqtt_ScheduleOperation( pOperation,
                                         _IotMqtt_ProcessSend,
                                         0 );

    if( status != IOT_MQTT_SUCCESS )
    {
        IotLogError( "Failed to enqueue CONNECT for sending." );
    }
    else
    {
        /* Wait for the CONNECT operation to complete, i.e. wait for CONNACK. */
        status = IotMqtt_Wait( pOperation,
                               timeoutMs );

        /* The call to wait cleans up the CONNECT operation, so set the pointer
         * to NULL. */
        pOperation = NULL;
    }

    /* When a connection is successfully established, schedule keep-alive job. */
    if( status == IOT_MQTT_SUCCESS )
    {
        /* Check if a keep-alive job should be scheduled. */
        if( pNewMqttConnection->keepAliveMs != 0 )
        {
            IotLogDebug( "Scheduling first MQTT keep-alive job." );

            taskPoolStatus = IotTaskPool_ScheduleDeferred( IOT_SYSTEM_TASKPOOL,
                                                           pNewMqttConnection->keepAliveJob,
                                                           pNewMqttConnection->nextKeepAliveMs );

            if( taskPoolStatus != IOT_TASKPOOL_SUCCESS )
            {
                IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_SCHEDULING_ERROR );
            }
            else
            {
                EMPTY_ELSE_MARKER;
            }
        }
        else
        {
            EMPTY_ELSE_MARKER;
        }
    }
    else
    {
        EMPTY_ELSE_MARKER;
    }

    IOT_FUNCTION_CLEANUP_BEGIN();

    if( status != IOT_MQTT_SUCCESS )
    {
        IotLogError( "Failed to establish new MQTT connection, error %s.",
                     IotMqtt_strerror( status ) );

        /* The network connection must be closed if it was created. */
        if( networkCreated == true )
        {
            networkStatus = pNetworkInfo->pNetworkInterface->close( pNetworkConnection );

            if( networkStatus != IOT_NETWORK_SUCCESS )
            {
                IotLogWarn( "Failed to close network connection." );
            }
            else
            {
                IotLogInfo( "Network connection closed on error." );
            }
        }
        else
        {
            EMPTY_ELSE_MARKER;
        }

        if( pOperation != NULL )
        {
            _IotMqtt_DestroyOperation( pOperation );
        }
        else
        {
            EMPTY_ELSE_MARKER;
        }

        if( pNewMqttConnection != NULL )
        {
            _destroyMqttConnection( pNewMqttConnection );
        }
        else
        {
            EMPTY_ELSE_MARKER;
        }
    }
    else
    {
        IotLogInfo( "New MQTT connection %p established.", pMqttConnection );

        /* Set the output parameter. */
        *pMqttConnection = pNewMqttConnection;
    }

    IOT_FUNCTION_CLEANUP_END();
}