in device_firmware/libraries/c_sdk/standard/mqtt/src/iot_mqtt_api.c [885:1233]
IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,
const IotMqttConnectInfo_t * pConnectInfo,
uint32_t timeoutMs,
IotMqttConnection_t * const pMqttConnection )
{
IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );
bool networkCreated = false, ownNetworkConnection = false;
IotNetworkError_t networkStatus = IOT_NETWORK_SUCCESS;
IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;
void * pNetworkConnection = NULL;
_mqttOperation_t * pOperation = NULL;
_mqttConnection_t * pNewMqttConnection = NULL;
/* Default CONNECT serializer function. */
IotMqttError_t ( * serializeConnect )( const IotMqttConnectInfo_t *,
uint8_t **,
size_t * ) = _IotMqtt_SerializeConnect;
/* Network info must not be NULL. */
if( pNetworkInfo == NULL )
{
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
}
else
{
EMPTY_ELSE_MARKER;
}
/* Validate network interface and connect info. */
if( _IotMqtt_ValidateConnect( pConnectInfo ) == false )
{
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
}
else
{
EMPTY_ELSE_MARKER;
}
/* If will info is provided, check that it is valid. */
if( pConnectInfo->pWillInfo != NULL )
{
if( _IotMqtt_ValidatePublish( pConnectInfo->awsIotMqttMode,
pConnectInfo->pWillInfo ) == false )
{
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
}
else if( pConnectInfo->pWillInfo->payloadLength > UINT16_MAX )
{
/* Will message payloads cannot be larger than 65535. This restriction
* applies only to will messages, and not normal PUBLISH messages. */
IotLogError( "Will payload cannot be larger than 65535." );
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* If previous subscriptions are provided, check that they are valid. */
if( pConnectInfo->cleanSession == false )
{
if( pConnectInfo->pPreviousSubscriptions != NULL )
{
if( _IotMqtt_ValidateSubscriptionList( IOT_MQTT_SUBSCRIBE,
pConnectInfo->awsIotMqttMode,
pConnectInfo->pPreviousSubscriptions,
pConnectInfo->previousSubscriptionCount ) == false )
{
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* Create a new MQTT connection if requested. Otherwise, copy the existing
* network connection. */
if( pNetworkInfo->createNetworkConnection == true )
{
networkStatus = pNetworkInfo->pNetworkInterface->create( pNetworkInfo->u.setup.pNetworkServerInfo,
pNetworkInfo->u.setup.pNetworkCredentialInfo,
&pNetworkConnection );
if( networkStatus == IOT_NETWORK_SUCCESS )
{
networkCreated = true;
/* This MQTT connection owns the network connection it created and
* should destroy it on cleanup. */
ownNetworkConnection = true;
}
else
{
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NETWORK_ERROR );
}
}
else
{
pNetworkConnection = pNetworkInfo->u.pNetworkConnection;
networkCreated = true;
}
IotLogInfo( "Establishing new MQTT connection." );
/* Initialize a new MQTT connection object. */
pNewMqttConnection = _createMqttConnection( pConnectInfo->awsIotMqttMode,
pNetworkInfo,
pConnectInfo->keepAliveSeconds );
if( pNewMqttConnection == NULL )
{
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );
}
else
{
/* Set the network connection associated with the MQTT connection. */
pNewMqttConnection->pNetworkConnection = pNetworkConnection;
pNewMqttConnection->ownNetworkConnection = ownNetworkConnection;
/* Set the MQTT packet serializer overrides. */
#if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
pNewMqttConnection->pSerializer = pNetworkInfo->pMqttSerializer;
#endif
}
/* Set the MQTT receive callback. */
networkStatus = pNewMqttConnection->pNetworkInterface->setReceiveCallback( pNetworkConnection,
IotMqtt_ReceiveCallback,
pNewMqttConnection );
if( networkStatus != IOT_NETWORK_SUCCESS )
{
IotLogError( "Failed to set MQTT network receive callback." );
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NETWORK_ERROR );
}
else
{
EMPTY_ELSE_MARKER;
}
/* Create a CONNECT operation. */
status = _IotMqtt_CreateOperation( pNewMqttConnection,
IOT_MQTT_FLAG_WAITABLE,
NULL,
&pOperation );
if( status != IOT_MQTT_SUCCESS )
{
IOT_GOTO_CLEANUP();
}
else
{
EMPTY_ELSE_MARKER;
}
/* Ensure the members set by operation creation and serialization
* are appropriate for a blocking CONNECT. */
IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );
IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE )
== IOT_MQTT_FLAG_WAITABLE );
IotMqtt_Assert( pOperation->u.operation.retry.limit == 0 );
/* Set the operation type. */
pOperation->u.operation.type = IOT_MQTT_CONNECT;
/* Add previous session subscriptions. */
if( pConnectInfo->pPreviousSubscriptions != NULL )
{
/* Previous subscription count should have been validated as nonzero. */
IotMqtt_Assert( pConnectInfo->previousSubscriptionCount > 0 );
status = _IotMqtt_AddSubscriptions( pNewMqttConnection,
2,
pConnectInfo->pPreviousSubscriptions,
pConnectInfo->previousSubscriptionCount );
if( status != IOT_MQTT_SUCCESS )
{
IOT_GOTO_CLEANUP();
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* Choose a CONNECT serializer function. */
#if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
if( pNewMqttConnection->pSerializer != NULL )
{
if( pNewMqttConnection->pSerializer->serialize.connect != NULL )
{
serializeConnect = pNewMqttConnection->pSerializer->serialize.connect;
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
#endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
/* Convert the connect info and will info objects to an MQTT CONNECT packet. */
status = serializeConnect( pConnectInfo,
&( pOperation->u.operation.pMqttPacket ),
&( pOperation->u.operation.packetSize ) );
if( status != IOT_MQTT_SUCCESS )
{
IOT_GOTO_CLEANUP();
}
else
{
EMPTY_ELSE_MARKER;
}
/* Check the serialized MQTT packet. */
IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );
IotMqtt_Assert( pOperation->u.operation.packetSize > 0 );
/* Add the CONNECT operation to the send queue for network transmission. */
status = _IotMqtt_ScheduleOperation( pOperation,
_IotMqtt_ProcessSend,
0 );
if( status != IOT_MQTT_SUCCESS )
{
IotLogError( "Failed to enqueue CONNECT for sending." );
}
else
{
/* Wait for the CONNECT operation to complete, i.e. wait for CONNACK. */
status = IotMqtt_Wait( pOperation,
timeoutMs );
/* The call to wait cleans up the CONNECT operation, so set the pointer
* to NULL. */
pOperation = NULL;
}
/* When a connection is successfully established, schedule keep-alive job. */
if( status == IOT_MQTT_SUCCESS )
{
/* Check if a keep-alive job should be scheduled. */
if( pNewMqttConnection->keepAliveMs != 0 )
{
IotLogDebug( "Scheduling first MQTT keep-alive job." );
taskPoolStatus = IotTaskPool_ScheduleDeferred( IOT_SYSTEM_TASKPOOL,
pNewMqttConnection->keepAliveJob,
pNewMqttConnection->nextKeepAliveMs );
if( taskPoolStatus != IOT_TASKPOOL_SUCCESS )
{
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_SCHEDULING_ERROR );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
IOT_FUNCTION_CLEANUP_BEGIN();
if( status != IOT_MQTT_SUCCESS )
{
IotLogError( "Failed to establish new MQTT connection, error %s.",
IotMqtt_strerror( status ) );
/* The network connection must be closed if it was created. */
if( networkCreated == true )
{
networkStatus = pNetworkInfo->pNetworkInterface->close( pNetworkConnection );
if( networkStatus != IOT_NETWORK_SUCCESS )
{
IotLogWarn( "Failed to close network connection." );
}
else
{
IotLogInfo( "Network connection closed on error." );
}
}
else
{
EMPTY_ELSE_MARKER;
}
if( pOperation != NULL )
{
_IotMqtt_DestroyOperation( pOperation );
}
else
{
EMPTY_ELSE_MARKER;
}
if( pNewMqttConnection != NULL )
{
_destroyMqttConnection( pNewMqttConnection );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
IotLogInfo( "New MQTT connection %p established.", pMqttConnection );
/* Set the output parameter. */
*pMqttConnection = pNewMqttConnection;
}
IOT_FUNCTION_CLEANUP_END();
}