int aws_mqtt_client_connection_connect()

in source/client.c [1363:1559]


int aws_mqtt_client_connection_connect(
    struct aws_mqtt_client_connection *connection,
    const struct aws_mqtt_connection_options *connection_options) {

    /* TODO: Do we need to support resuming the connection if user connect to the same connection & endpoint and the
     * clean_session is false?
     * If not, the broker will resume the connection in this case, and we pretend we are making a new connection, which
     * may cause some confusing behavior. This is basically what we have now. NOTE: The topic_tree is living with the
     * connection right now, which is really confusing.
     * If yes, an edge case will be: User disconnected from the connection with clean_session
     * being false, then connect to another endpoint with the same connection object, we probably need to clear all
     * those states from last connection and create a new "connection". Problem is what if user finish the second
     * connection and reconnect to the first endpoint. There is no way for us to resume the connection in this case. */

    AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: Opening connection", (void *)connection);
    { /* BEGIN CRITICAL SECTION */
        mqtt_connection_lock_synced_data(connection);

        if (connection->synced_data.state != AWS_MQTT_CLIENT_STATE_DISCONNECTED) {
            mqtt_connection_unlock_synced_data(connection);
            return aws_raise_error(AWS_ERROR_MQTT_ALREADY_CONNECTED);
        }
        mqtt_connection_set_state(connection, AWS_MQTT_CLIENT_STATE_CONNECTING);
        AWS_LOGF_DEBUG(
            AWS_LS_MQTT_CLIENT, "id=%p: Begin connecting process, switch state to CONNECTING.", (void *)connection);
        mqtt_connection_unlock_synced_data(connection);
    } /* END CRITICAL SECTION */

    if (connection->host_name) {
        aws_string_destroy(connection->host_name);
    }

    connection->host_name = aws_string_new_from_array(
        connection->allocator, connection_options->host_name.ptr, connection_options->host_name.len);
    connection->port = connection_options->port;
    connection->socket_options = *connection_options->socket_options;
    connection->clean_session = connection_options->clean_session;
    connection->keep_alive_time_secs = connection_options->keep_alive_time_secs;
    connection->connection_count = 0;

    if (!connection->keep_alive_time_secs) {
        connection->keep_alive_time_secs = s_default_keep_alive_sec;
    }
    if (!connection_options->protocol_operation_timeout_ms) {
        connection->operation_timeout_ns = UINT64_MAX;
    } else {
        connection->operation_timeout_ns = aws_timestamp_convert(
            (uint64_t)connection_options->protocol_operation_timeout_ms,
            AWS_TIMESTAMP_MILLIS,
            AWS_TIMESTAMP_NANOS,
            NULL);
    }

    if (!connection_options->ping_timeout_ms) {
        connection->ping_timeout_ns = s_default_ping_timeout_ns;
    } else {
        connection->ping_timeout_ns = aws_timestamp_convert(
            (uint64_t)connection_options->ping_timeout_ms, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL);
    }

    /* Keep alive time should always be greater than the timeouts. */
    if (AWS_UNLIKELY(connection->keep_alive_time_secs * (uint64_t)AWS_TIMESTAMP_NANOS <= connection->ping_timeout_ns)) {
        AWS_LOGF_FATAL(
            AWS_LS_MQTT_CLIENT,
            "id=%p: Illegal configuration, Connection keep alive %" PRIu64
            "ns must be greater than the request timeouts %" PRIu64 "ns.",
            (void *)connection,
            (uint64_t)connection->keep_alive_time_secs * (uint64_t)AWS_TIMESTAMP_NANOS,
            connection->ping_timeout_ns);
        AWS_FATAL_ASSERT(
            connection->keep_alive_time_secs * (uint64_t)AWS_TIMESTAMP_NANOS > connection->ping_timeout_ns);
    }

    AWS_LOGF_INFO(
        AWS_LS_MQTT_CLIENT,
        "id=%p: using ping timeout of %" PRIu64 " ns",
        (void *)connection,
        connection->ping_timeout_ns);

    /* Cheat and set the tls_options host_name to our copy if they're the same */
    if (connection_options->tls_options) {
        connection->use_tls = true;
        if (aws_tls_connection_options_copy(&connection->tls_options, connection_options->tls_options)) {

            AWS_LOGF_ERROR(
                AWS_LS_MQTT_CLIENT, "id=%p: Failed to copy TLS Connection Options into connection", (void *)connection);
            return AWS_OP_ERR;
        }

        if (!connection_options->tls_options->server_name) {
            struct aws_byte_cursor host_name_cur = aws_byte_cursor_from_string(connection->host_name);
            if (aws_tls_connection_options_set_server_name(
                    &connection->tls_options, connection->allocator, &host_name_cur)) {

                AWS_LOGF_ERROR(
                    AWS_LS_MQTT_CLIENT, "id=%p: Failed to set TLS Connection Options server name", (void *)connection);
                goto error;
            }
        }

    } else {
        AWS_ZERO_STRUCT(connection->tls_options);
    }

    /* Clean up old client_id */
    if (connection->client_id.buffer) {
        aws_byte_buf_clean_up(&connection->client_id);
    }

    /* Only set connection->client_id if a new one was provided */
    struct aws_byte_buf client_id_buf =
        aws_byte_buf_from_array(connection_options->client_id.ptr, connection_options->client_id.len);
    if (aws_byte_buf_init_copy(&connection->client_id, connection->allocator, &client_id_buf)) {
        AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Failed to copy client_id into connection", (void *)connection);
        goto error;
    }

    struct aws_linked_list cancelling_requests;
    aws_linked_list_init(&cancelling_requests);
    { /* BEGIN CRITICAL SECTION */
        mqtt_connection_lock_synced_data(connection);
        if (connection->clean_session) {
            AWS_LOGF_TRACE(
                AWS_LS_MQTT_CLIENT,
                "id=%p: a clean session connection requested, all the previous requests will fail",
                (void *)connection);
            aws_linked_list_swap_contents(&connection->synced_data.pending_requests_list, &cancelling_requests);
        }
        mqtt_connection_unlock_synced_data(connection);
    } /* END CRITICAL SECTION */

    if (!aws_linked_list_empty(&cancelling_requests)) {

        struct aws_linked_list_node *current = aws_linked_list_front(&cancelling_requests);
        const struct aws_linked_list_node *end = aws_linked_list_end(&cancelling_requests);
        /* invoke all the complete callback for requests from previous session */
        while (current != end) {
            struct aws_mqtt_request *request = AWS_CONTAINER_OF(current, struct aws_mqtt_request, list_node);
            AWS_LOGF_TRACE(
                AWS_LS_MQTT_CLIENT,
                "id=%p: Establishing a new clean session connection, discard the previous request %" PRIu16,
                (void *)connection,
                request->packet_id);
            if (request->on_complete) {
                request->on_complete(
                    connection,
                    request->packet_id,
                    AWS_ERROR_MQTT_CANCELLED_FOR_CLEAN_SESSION,
                    request->on_complete_ud);
            }
            current = current->next;
        }
        /* free the resource */
        { /* BEGIN CRITICAL SECTION */
            mqtt_connection_lock_synced_data(connection);
            while (!aws_linked_list_empty(&cancelling_requests)) {
                struct aws_linked_list_node *node = aws_linked_list_pop_front(&cancelling_requests);
                struct aws_mqtt_request *request = AWS_CONTAINER_OF(node, struct aws_mqtt_request, list_node);
                aws_hash_table_remove(
                    &connection->synced_data.outstanding_requests_table, &request->packet_id, NULL, NULL);
                aws_memory_pool_release(&connection->synced_data.requests_pool, request);
            }
            mqtt_connection_unlock_synced_data(connection);
        } /* END CRITICAL SECTION */
    }

    /* Begin the connecting process, acquire the connection to keep it alive until we disconnected */
    aws_mqtt_client_connection_acquire(connection);

    if (s_mqtt_client_connect(connection, connection_options->on_connection_complete, connection_options->user_data)) {
        /*
         * An error calling s_mqtt_client_connect should (must) be mutually exclusive with s_mqtt_client_shutdown().
         * So it should be safe and correct to call release now to undo the pinning we did a few lines above.
         */
        aws_mqtt_client_connection_release(connection);

        /* client_id has been updated with something but it will get cleaned up when the connection gets cleaned up
         * so we don't need to worry about it here*/
        if (connection->clean_session) {
            AWS_LOGF_WARN(
                AWS_LS_MQTT_CLIENT, "id=%p: The previous session has been cleaned up and losted!", (void *)connection);
        }
        goto error;
    }

    return AWS_OP_SUCCESS;

error:
    aws_tls_connection_options_clean_up(&connection->tls_options);
    AWS_ZERO_STRUCT(connection->tls_options);
    { /* BEGIN CRITICAL SECTION */
        mqtt_connection_lock_synced_data(connection);
        mqtt_connection_set_state(connection, AWS_MQTT_CLIENT_STATE_DISCONNECTED);
        mqtt_connection_unlock_synced_data(connection);
    } /* END CRITICAL SECTION */
    return AWS_OP_ERR;
}