static IotMqttError_t _subscriptionCommon()

in device_firmware/libraries/c_sdk/standard/mqtt/src/iot_mqtt_api.c [543:755]


static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation,
                                           IotMqttConnection_t mqttConnection,
                                           const IotMqttSubscription_t * pSubscriptionList,
                                           size_t subscriptionCount,
                                           uint32_t flags,
                                           const IotMqttCallbackInfo_t * pCallbackInfo,
                                           IotMqttOperation_t * pOperationReference )
{
    IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );
    _mqttOperation_t * pSubscriptionOperation = NULL;

    /* Subscription serializer function. */
    IotMqttError_t ( * serializeSubscription )( const IotMqttSubscription_t *,
                                                size_t,
                                                uint8_t **,
                                                size_t *,
                                                uint16_t * ) = NULL;

    /* This function should only be called for subscribe or unsubscribe. */
    IotMqtt_Assert( ( operation == IOT_MQTT_SUBSCRIBE ) ||
                    ( operation == IOT_MQTT_UNSUBSCRIBE ) );

    /* Check that all elements in the subscription list are valid. */
    if( _IotMqtt_ValidateSubscriptionList( operation,
                                           mqttConnection->awsIotMqttMode,
                                           pSubscriptionList,
                                           subscriptionCount ) == false )
    {
        IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
    }
    else
    {
        EMPTY_ELSE_MARKER;
    }

    /* Check that a reference pointer is provided for a waitable operation. */
    if( ( flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE )
    {
        if( pOperationReference == NULL )
        {
            IotLogError( "Reference must be provided for a waitable %s.",
                         IotMqtt_OperationType( operation ) );

            IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
        }
        else
        {
            EMPTY_ELSE_MARKER;
        }
    }
    else
    {
        EMPTY_ELSE_MARKER;
    }

    /* Choose a subscription serialize function. */
    if( operation == IOT_MQTT_SUBSCRIBE )
    {
        serializeSubscription = _IotMqtt_SerializeSubscribe;

        #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
            if( mqttConnection->pSerializer != NULL )
            {
                if( mqttConnection->pSerializer->serialize.subscribe != NULL )
                {
                    serializeSubscription = mqttConnection->pSerializer->serialize.subscribe;
                }
                else
                {
                    EMPTY_ELSE_MARKER;
                }
            }
            else
            {
                EMPTY_ELSE_MARKER;
            }
        #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
    }
    else
    {
        serializeSubscription = _IotMqtt_SerializeUnsubscribe;

        #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
            if( mqttConnection->pSerializer != NULL )
            {
                if( mqttConnection->pSerializer->serialize.unsubscribe != NULL )
                {
                    serializeSubscription = mqttConnection->pSerializer->serialize.unsubscribe;
                }
                else
                {
                    EMPTY_ELSE_MARKER;
                }
            }
            else
            {
                EMPTY_ELSE_MARKER;
            }
        #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
    }

    /* Remove the MQTT subscription list for an UNSUBSCRIBE. */
    if( operation == IOT_MQTT_UNSUBSCRIBE )
    {
        _IotMqtt_RemoveSubscriptionByTopicFilter( mqttConnection,
                                                  pSubscriptionList,
                                                  subscriptionCount );
    }
    else
    {
        EMPTY_ELSE_MARKER;
    }

    /* Create a subscription operation. */
    status = _IotMqtt_CreateOperation( mqttConnection,
                                       flags,
                                       pCallbackInfo,
                                       &pSubscriptionOperation );

    if( status != IOT_MQTT_SUCCESS )
    {
        IOT_GOTO_CLEANUP();
    }

    /* Check the subscription operation data and set the operation type. */
    IotMqtt_Assert( pSubscriptionOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );
    IotMqtt_Assert( pSubscriptionOperation->u.operation.retry.limit == 0 );
    pSubscriptionOperation->u.operation.type = operation;

    /* Generate a subscription packet from the subscription list. */
    status = serializeSubscription( pSubscriptionList,
                                    subscriptionCount,
                                    &( pSubscriptionOperation->u.operation.pMqttPacket ),
                                    &( pSubscriptionOperation->u.operation.packetSize ),
                                    &( pSubscriptionOperation->u.operation.packetIdentifier ) );

    if( status != IOT_MQTT_SUCCESS )
    {
        IOT_GOTO_CLEANUP();
    }

    /* Check the serialized MQTT packet. */
    IotMqtt_Assert( pSubscriptionOperation->u.operation.pMqttPacket != NULL );
    IotMqtt_Assert( pSubscriptionOperation->u.operation.packetSize > 0 );

    /* Add the subscription list for a SUBSCRIBE. */
    if( operation == IOT_MQTT_SUBSCRIBE )
    {
        status = _IotMqtt_AddSubscriptions( mqttConnection,
                                            pSubscriptionOperation->u.operation.packetIdentifier,
                                            pSubscriptionList,
                                            subscriptionCount );

        if( status != IOT_MQTT_SUCCESS )
        {
            IOT_GOTO_CLEANUP();
        }
    }

    /* Set the reference, if provided. */
    if( pOperationReference != NULL )
    {
        *pOperationReference = pSubscriptionOperation;
    }

    /* Schedule the subscription operation for network transmission. */
    status = _IotMqtt_ScheduleOperation( pSubscriptionOperation,
                                         _IotMqtt_ProcessSend,
                                         0 );

    if( status != IOT_MQTT_SUCCESS )
    {
        IotLogError( "(MQTT connection %p) Failed to schedule %s for sending.",
                     mqttConnection,
                     IotMqtt_OperationType( operation ) );

        if( operation == IOT_MQTT_SUBSCRIBE )
        {
            _IotMqtt_RemoveSubscriptionByPacket( mqttConnection,
                                                 pSubscriptionOperation->u.operation.packetIdentifier,
                                                 -1 );
        }

        /* Clear the previously set (and now invalid) reference. */
        if( pOperationReference != NULL )
        {
            *pOperationReference = IOT_MQTT_OPERATION_INITIALIZER;
        }

        IOT_GOTO_CLEANUP();
    }

    /* Clean up if this function failed. */
    IOT_FUNCTION_CLEANUP_BEGIN();

    if( status != IOT_MQTT_SUCCESS )
    {
        if( pSubscriptionOperation != NULL )
        {
            _IotMqtt_DestroyOperation( pSubscriptionOperation );
        }
    }
    else
    {
        status = IOT_MQTT_STATUS_PENDING;

        IotLogInfo( "(MQTT connection %p) %s operation scheduled.",
                    mqttConnection,
                    IotMqtt_OperationType( operation ) );
    }

    IOT_FUNCTION_CLEANUP_END();
}