static enum aws_mqtt_client_request_state s_unsubscribe_send()

in source/client.c [2465:2559]


static enum aws_mqtt_client_request_state s_unsubscribe_send(
    uint16_t packet_id,
    bool is_first_attempt,
    void *userdata) {

    (void)is_first_attempt;

    struct unsubscribe_task_arg *task_arg = userdata;
    struct aws_io_message *message = NULL;

    AWS_LOGF_TRACE(
        AWS_LS_MQTT_CLIENT,
        "id=%p: Attempting send of unsubscribe %" PRIu16 " %s",
        (void *)task_arg->connection,
        packet_id,
        is_first_attempt ? "first attempt" : "resend");

    static const size_t num_topics = 1;

    AWS_VARIABLE_LENGTH_ARRAY(uint8_t, transaction_buf, num_topics * aws_mqtt_topic_tree_action_size);
    struct aws_array_list transaction;
    aws_array_list_init_static(&transaction, transaction_buf, num_topics, aws_mqtt_topic_tree_action_size);

    if (!task_arg->tree_updated) {

        struct subscribe_task_topic *topic;
        if (aws_mqtt_topic_tree_transaction_remove(
                &task_arg->connection->thread_data.subscriptions, &transaction, &task_arg->filter, (void **)&topic)) {
            goto handle_error;
        }

        task_arg->is_local = topic ? topic->is_local : false;
    }

    if (!task_arg->is_local) {
        if (task_arg->unsubscribe.fixed_header.packet_type == 0) {
            /* If unsubscribe packet is uninitialized, init it */
            if (aws_mqtt_packet_unsubscribe_init(&task_arg->unsubscribe, task_arg->connection->allocator, packet_id)) {
                goto handle_error;
            }
            if (aws_mqtt_packet_unsubscribe_add_topic(&task_arg->unsubscribe, task_arg->filter)) {
                goto handle_error;
            }
        }

        message = mqtt_get_message_for_packet(task_arg->connection, &task_arg->unsubscribe.fixed_header);
        if (!message) {
            goto handle_error;
        }

        if (aws_mqtt_packet_unsubscribe_encode(&message->message_data, &task_arg->unsubscribe)) {
            goto handle_error;
        }

        if (aws_channel_slot_send_message(task_arg->connection->slot, message, AWS_CHANNEL_DIR_WRITE)) {
            goto handle_error;
        }

        /* TODO: timing should start from the message written into the socket, which is aws_io_message->on_completion
         * invoked, but there are bugs in the websocket handler (and maybe also the h1 handler?) where we don't properly
         * fire fire the on_completion callbacks. */
        struct request_timeout_task_arg *timeout_task_arg = s_schedule_timeout_task(task_arg->connection, packet_id);
        if (!timeout_task_arg) {
            return AWS_MQTT_CLIENT_REQUEST_ERROR;
        }

        /*
         * Set up mutual references between the operation task args and the timeout task args.  Whoever runs first
         * "wins", does its logic, and then breaks the connection between the two.
         */
        task_arg->timeout_wrapper.timeout_task_arg = timeout_task_arg;
        timeout_task_arg->task_arg_wrapper = &task_arg->timeout_wrapper;
    }

    if (!task_arg->tree_updated) {
        aws_mqtt_topic_tree_transaction_commit(&task_arg->connection->thread_data.subscriptions, &transaction);
        task_arg->tree_updated = true;
    }

    aws_array_list_clean_up(&transaction);
    /* If the subscribe is local-only, don't wait for a SUBACK to come back. */
    return task_arg->is_local ? AWS_MQTT_CLIENT_REQUEST_COMPLETE : AWS_MQTT_CLIENT_REQUEST_ONGOING;

handle_error:

    if (message) {
        aws_mem_release(message->allocator, message);
    }
    if (!task_arg->tree_updated) {
        aws_mqtt_topic_tree_transaction_roll_back(&task_arg->connection->thread_data.subscriptions, &transaction);
    }

    aws_array_list_clean_up(&transaction);
    return AWS_MQTT_CLIENT_REQUEST_ERROR;
}