static void s_mqtt_client_shutdown()

in source/client.c [192:384]


static void s_mqtt_client_shutdown(
    struct aws_client_bootstrap *bootstrap,
    int error_code,
    struct aws_channel *channel,
    void *user_data) {

    (void)bootstrap;
    (void)channel;

    struct aws_mqtt_client_connection *connection = user_data;

    AWS_LOGF_TRACE(
        AWS_LS_MQTT_CLIENT, "id=%p: Channel has been shutdown with error code %d", (void *)connection, error_code);
    enum aws_mqtt_client_connection_state prev_state;
    struct aws_linked_list cancelling_requests;
    aws_linked_list_init(&cancelling_requests);
    bool disconnected_state = false;
    { /* BEGIN CRITICAL SECTION */
        mqtt_connection_lock_synced_data(connection);
        /* Move all the ongoing requests to the pending requests list, because the response they are waiting for will
         * never arrives. Sad. But, we will retry. */
        if (connection->clean_session) {
            /* For a clean session, the Session lasts as long as the Network Connection. Thus, discard the previous
             * session */
            AWS_LOGF_TRACE(
                AWS_LS_MQTT_CLIENT,
                "id=%p: Discard ongoing requests and pending requests when a clean session connection lost.",
                (void *)connection);
            aws_linked_list_move_all_back(&cancelling_requests, &connection->thread_data.ongoing_requests_list);
            aws_linked_list_move_all_back(&cancelling_requests, &connection->synced_data.pending_requests_list);
        } else {
            aws_linked_list_move_all_back(
                &connection->synced_data.pending_requests_list, &connection->thread_data.ongoing_requests_list);
            AWS_LOGF_TRACE(
                AWS_LS_MQTT_CLIENT,
                "id=%p: All subscribe/unsubscribe and publish QoS>0 have been move to pending list",
                (void *)connection);
        }
        prev_state = connection->synced_data.state;
        switch (connection->synced_data.state) {
            case AWS_MQTT_CLIENT_STATE_CONNECTED:
                /* unexpected hangup from broker, try to reconnect */
                mqtt_connection_set_state(connection, AWS_MQTT_CLIENT_STATE_RECONNECTING);
                AWS_LOGF_DEBUG(
                    AWS_LS_MQTT_CLIENT,
                    "id=%p: connection was unexpected interrupted, switch state to RECONNECTING.",
                    (void *)connection);
                break;
            case AWS_MQTT_CLIENT_STATE_DISCONNECTING:
                /* disconnect requested by user */
                /* Successfully shutdown, so clear the outstanding requests */
                /* TODO: respect the cleansession, clear the table when needed */
                aws_hash_table_clear(&connection->synced_data.outstanding_requests_table);
                disconnected_state = true;
                AWS_LOGF_DEBUG(
                    AWS_LS_MQTT_CLIENT,
                    "id=%p: disconnect finished, switch state to DISCONNECTED.",
                    (void *)connection);
                break;
            case AWS_MQTT_CLIENT_STATE_CONNECTING:
                /* failed to connect */
                disconnected_state = true;
                break;
            case AWS_MQTT_CLIENT_STATE_RECONNECTING:
                /* reconnect failed, schedule the next attempt later, no need to change the state. */
                break;
            default:
                /* AWS_MQTT_CLIENT_STATE_DISCONNECTED */
                break;
        }
        AWS_LOGF_TRACE(
            AWS_LS_MQTT_CLIENT, "id=%p: current state is %d", (void *)connection, (int)connection->synced_data.state);
        /* Always clear slot, as that's what's been shutdown */
        if (connection->slot) {
            aws_channel_slot_remove(connection->slot);
            AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: slot is removed successfully", (void *)connection);
            connection->slot = NULL;
        }

        mqtt_connection_unlock_synced_data(connection);
    } /* END CRITICAL SECTION */

    if (!aws_linked_list_empty(&cancelling_requests)) {
        struct aws_linked_list_node *current = aws_linked_list_front(&cancelling_requests);
        const struct aws_linked_list_node *end = aws_linked_list_end(&cancelling_requests);
        while (current != end) {
            struct aws_mqtt_request *request = AWS_CONTAINER_OF(current, struct aws_mqtt_request, list_node);
            if (request->on_complete) {
                request->on_complete(
                    connection,
                    request->packet_id,
                    AWS_ERROR_MQTT_CANCELLED_FOR_CLEAN_SESSION,
                    request->on_complete_ud);
            }
            current = current->next;
        }
        { /* BEGIN CRITICAL SECTION */
            mqtt_connection_lock_synced_data(connection);
            while (!aws_linked_list_empty(&cancelling_requests)) {
                struct aws_linked_list_node *node = aws_linked_list_pop_front(&cancelling_requests);
                struct aws_mqtt_request *request = AWS_CONTAINER_OF(node, struct aws_mqtt_request, list_node);
                aws_hash_table_remove(
                    &connection->synced_data.outstanding_requests_table, &request->packet_id, NULL, NULL);
                aws_memory_pool_release(&connection->synced_data.requests_pool, request);
            }
            mqtt_connection_unlock_synced_data(connection);
        } /* END CRITICAL SECTION */
    }

    /* If there's no error code and this wasn't user-requested, set the error code to something useful */
    if (error_code == AWS_ERROR_SUCCESS) {
        if (prev_state != AWS_MQTT_CLIENT_STATE_DISCONNECTING && prev_state != AWS_MQTT_CLIENT_STATE_DISCONNECTED) {
            error_code = AWS_ERROR_MQTT_UNEXPECTED_HANGUP;
        }
    }
    switch (prev_state) {
        case AWS_MQTT_CLIENT_STATE_RECONNECTING: {
            /* If reconnect attempt failed, schedule the next attempt */
            struct aws_event_loop *el =
                aws_event_loop_group_get_next_loop(connection->client->bootstrap->event_loop_group);

            AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: Reconnect failed, retrying", (void *)connection);

            aws_event_loop_schedule_task_future(
                el, &connection->reconnect_task->task, connection->reconnect_timeouts.next_attempt);
            break;
        }
        case AWS_MQTT_CLIENT_STATE_CONNECTED: {
            AWS_LOGF_DEBUG(
                AWS_LS_MQTT_CLIENT,
                "id=%p: Connection interrupted, calling callback and attempting reconnect",
                (void *)connection);
            MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_interrupted, error_code);

            /* In case user called disconnect from the on_interrupted callback */
            bool stop_reconnect;
            { /* BEGIN CRITICAL SECTION */
                mqtt_connection_lock_synced_data(connection);
                stop_reconnect = connection->synced_data.state == AWS_MQTT_CLIENT_STATE_DISCONNECTING;
                if (stop_reconnect) {
                    disconnected_state = true;
                    AWS_LOGF_DEBUG(
                        AWS_LS_MQTT_CLIENT,
                        "id=%p: disconnect finished, switch state to DISCONNECTED.",
                        (void *)connection);
                }
                mqtt_connection_unlock_synced_data(connection);
            } /* END CRITICAL SECTION */
            if (!stop_reconnect) {
                /* Attempt the reconnect immediately, which will schedule a task to retry if it doesn't succeed */
                connection->reconnect_task->task.fn(
                    &connection->reconnect_task->task, connection->reconnect_task->task.arg, AWS_TASK_STATUS_RUN_READY);
            }
            break;
        }
        default:
            break;
    }
    if (disconnected_state) {
        { /* BEGIN CRITICAL SECTION */
            mqtt_connection_lock_synced_data(connection);
            mqtt_connection_set_state(connection, AWS_MQTT_CLIENT_STATE_DISCONNECTED);
            mqtt_connection_unlock_synced_data(connection);
        } /* END CRITICAL SECTION */
        switch (prev_state) {
            case AWS_MQTT_CLIENT_STATE_CONNECTED:
                AWS_LOGF_TRACE(
                    AWS_LS_MQTT_CLIENT,
                    "id=%p: Caller requested disconnect from on_interrupted callback, aborting reconnect",
                    (void *)connection);
                MQTT_CLIENT_CALL_CALLBACK(connection, on_disconnect);
                break;
            case AWS_MQTT_CLIENT_STATE_DISCONNECTING:
                AWS_LOGF_DEBUG(
                    AWS_LS_MQTT_CLIENT,
                    "id=%p: Disconnect completed, clearing request queue and calling callback",
                    (void *)connection);
                MQTT_CLIENT_CALL_CALLBACK(connection, on_disconnect);
                break;
            case AWS_MQTT_CLIENT_STATE_CONNECTING:
                AWS_LOGF_TRACE(
                    AWS_LS_MQTT_CLIENT,
                    "id=%p: Initial connection attempt failed, calling callback",
                    (void *)connection);
                MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_connection_complete, error_code, 0, false);
                break;
            default:
                break;
        }
        /* The connection can die now. Release the refcount */
        aws_mqtt_client_connection_release(connection);
    }
}