in device_firmware/libraries/c_sdk/standard/mqtt/src/iot_mqtt_api.c [543:755]
static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation,
IotMqttConnection_t mqttConnection,
const IotMqttSubscription_t * pSubscriptionList,
size_t subscriptionCount,
uint32_t flags,
const IotMqttCallbackInfo_t * pCallbackInfo,
IotMqttOperation_t * pOperationReference )
{
IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );
_mqttOperation_t * pSubscriptionOperation = NULL;
/* Subscription serializer function. */
IotMqttError_t ( * serializeSubscription )( const IotMqttSubscription_t *,
size_t,
uint8_t **,
size_t *,
uint16_t * ) = NULL;
/* This function should only be called for subscribe or unsubscribe. */
IotMqtt_Assert( ( operation == IOT_MQTT_SUBSCRIBE ) ||
( operation == IOT_MQTT_UNSUBSCRIBE ) );
/* Check that all elements in the subscription list are valid. */
if( _IotMqtt_ValidateSubscriptionList( operation,
mqttConnection->awsIotMqttMode,
pSubscriptionList,
subscriptionCount ) == false )
{
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
}
else
{
EMPTY_ELSE_MARKER;
}
/* Check that a reference pointer is provided for a waitable operation. */
if( ( flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE )
{
if( pOperationReference == NULL )
{
IotLogError( "Reference must be provided for a waitable %s.",
IotMqtt_OperationType( operation ) );
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* Choose a subscription serialize function. */
if( operation == IOT_MQTT_SUBSCRIBE )
{
serializeSubscription = _IotMqtt_SerializeSubscribe;
#if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
if( mqttConnection->pSerializer != NULL )
{
if( mqttConnection->pSerializer->serialize.subscribe != NULL )
{
serializeSubscription = mqttConnection->pSerializer->serialize.subscribe;
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
#endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
}
else
{
serializeSubscription = _IotMqtt_SerializeUnsubscribe;
#if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
if( mqttConnection->pSerializer != NULL )
{
if( mqttConnection->pSerializer->serialize.unsubscribe != NULL )
{
serializeSubscription = mqttConnection->pSerializer->serialize.unsubscribe;
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
#endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
}
/* Remove the MQTT subscription list for an UNSUBSCRIBE. */
if( operation == IOT_MQTT_UNSUBSCRIBE )
{
_IotMqtt_RemoveSubscriptionByTopicFilter( mqttConnection,
pSubscriptionList,
subscriptionCount );
}
else
{
EMPTY_ELSE_MARKER;
}
/* Create a subscription operation. */
status = _IotMqtt_CreateOperation( mqttConnection,
flags,
pCallbackInfo,
&pSubscriptionOperation );
if( status != IOT_MQTT_SUCCESS )
{
IOT_GOTO_CLEANUP();
}
/* Check the subscription operation data and set the operation type. */
IotMqtt_Assert( pSubscriptionOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );
IotMqtt_Assert( pSubscriptionOperation->u.operation.retry.limit == 0 );
pSubscriptionOperation->u.operation.type = operation;
/* Generate a subscription packet from the subscription list. */
status = serializeSubscription( pSubscriptionList,
subscriptionCount,
&( pSubscriptionOperation->u.operation.pMqttPacket ),
&( pSubscriptionOperation->u.operation.packetSize ),
&( pSubscriptionOperation->u.operation.packetIdentifier ) );
if( status != IOT_MQTT_SUCCESS )
{
IOT_GOTO_CLEANUP();
}
/* Check the serialized MQTT packet. */
IotMqtt_Assert( pSubscriptionOperation->u.operation.pMqttPacket != NULL );
IotMqtt_Assert( pSubscriptionOperation->u.operation.packetSize > 0 );
/* Add the subscription list for a SUBSCRIBE. */
if( operation == IOT_MQTT_SUBSCRIBE )
{
status = _IotMqtt_AddSubscriptions( mqttConnection,
pSubscriptionOperation->u.operation.packetIdentifier,
pSubscriptionList,
subscriptionCount );
if( status != IOT_MQTT_SUCCESS )
{
IOT_GOTO_CLEANUP();
}
}
/* Set the reference, if provided. */
if( pOperationReference != NULL )
{
*pOperationReference = pSubscriptionOperation;
}
/* Schedule the subscription operation for network transmission. */
status = _IotMqtt_ScheduleOperation( pSubscriptionOperation,
_IotMqtt_ProcessSend,
0 );
if( status != IOT_MQTT_SUCCESS )
{
IotLogError( "(MQTT connection %p) Failed to schedule %s for sending.",
mqttConnection,
IotMqtt_OperationType( operation ) );
if( operation == IOT_MQTT_SUBSCRIBE )
{
_IotMqtt_RemoveSubscriptionByPacket( mqttConnection,
pSubscriptionOperation->u.operation.packetIdentifier,
-1 );
}
/* Clear the previously set (and now invalid) reference. */
if( pOperationReference != NULL )
{
*pOperationReference = IOT_MQTT_OPERATION_INITIALIZER;
}
IOT_GOTO_CLEANUP();
}
/* Clean up if this function failed. */
IOT_FUNCTION_CLEANUP_BEGIN();
if( status != IOT_MQTT_SUCCESS )
{
if( pSubscriptionOperation != NULL )
{
_IotMqtt_DestroyOperation( pSubscriptionOperation );
}
}
else
{
status = IOT_MQTT_STATUS_PENDING;
IotLogInfo( "(MQTT connection %p) %s operation scheduled.",
mqttConnection,
IotMqtt_OperationType( operation ) );
}
IOT_FUNCTION_CLEANUP_END();
}