in source/client.c [1363:1559]
int aws_mqtt_client_connection_connect(
struct aws_mqtt_client_connection *connection,
const struct aws_mqtt_connection_options *connection_options) {
/* TODO: Do we need to support resuming the connection if user connect to the same connection & endpoint and the
* clean_session is false?
* If not, the broker will resume the connection in this case, and we pretend we are making a new connection, which
* may cause some confusing behavior. This is basically what we have now. NOTE: The topic_tree is living with the
* connection right now, which is really confusing.
* If yes, an edge case will be: User disconnected from the connection with clean_session
* being false, then connect to another endpoint with the same connection object, we probably need to clear all
* those states from last connection and create a new "connection". Problem is what if user finish the second
* connection and reconnect to the first endpoint. There is no way for us to resume the connection in this case. */
AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: Opening connection", (void *)connection);
{ /* BEGIN CRITICAL SECTION */
mqtt_connection_lock_synced_data(connection);
if (connection->synced_data.state != AWS_MQTT_CLIENT_STATE_DISCONNECTED) {
mqtt_connection_unlock_synced_data(connection);
return aws_raise_error(AWS_ERROR_MQTT_ALREADY_CONNECTED);
}
mqtt_connection_set_state(connection, AWS_MQTT_CLIENT_STATE_CONNECTING);
AWS_LOGF_DEBUG(
AWS_LS_MQTT_CLIENT, "id=%p: Begin connecting process, switch state to CONNECTING.", (void *)connection);
mqtt_connection_unlock_synced_data(connection);
} /* END CRITICAL SECTION */
if (connection->host_name) {
aws_string_destroy(connection->host_name);
}
connection->host_name = aws_string_new_from_array(
connection->allocator, connection_options->host_name.ptr, connection_options->host_name.len);
connection->port = connection_options->port;
connection->socket_options = *connection_options->socket_options;
connection->clean_session = connection_options->clean_session;
connection->keep_alive_time_secs = connection_options->keep_alive_time_secs;
connection->connection_count = 0;
if (!connection->keep_alive_time_secs) {
connection->keep_alive_time_secs = s_default_keep_alive_sec;
}
if (!connection_options->protocol_operation_timeout_ms) {
connection->operation_timeout_ns = UINT64_MAX;
} else {
connection->operation_timeout_ns = aws_timestamp_convert(
(uint64_t)connection_options->protocol_operation_timeout_ms,
AWS_TIMESTAMP_MILLIS,
AWS_TIMESTAMP_NANOS,
NULL);
}
if (!connection_options->ping_timeout_ms) {
connection->ping_timeout_ns = s_default_ping_timeout_ns;
} else {
connection->ping_timeout_ns = aws_timestamp_convert(
(uint64_t)connection_options->ping_timeout_ms, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL);
}
/* Keep alive time should always be greater than the timeouts. */
if (AWS_UNLIKELY(connection->keep_alive_time_secs * (uint64_t)AWS_TIMESTAMP_NANOS <= connection->ping_timeout_ns)) {
AWS_LOGF_FATAL(
AWS_LS_MQTT_CLIENT,
"id=%p: Illegal configuration, Connection keep alive %" PRIu64
"ns must be greater than the request timeouts %" PRIu64 "ns.",
(void *)connection,
(uint64_t)connection->keep_alive_time_secs * (uint64_t)AWS_TIMESTAMP_NANOS,
connection->ping_timeout_ns);
AWS_FATAL_ASSERT(
connection->keep_alive_time_secs * (uint64_t)AWS_TIMESTAMP_NANOS > connection->ping_timeout_ns);
}
AWS_LOGF_INFO(
AWS_LS_MQTT_CLIENT,
"id=%p: using ping timeout of %" PRIu64 " ns",
(void *)connection,
connection->ping_timeout_ns);
/* Cheat and set the tls_options host_name to our copy if they're the same */
if (connection_options->tls_options) {
connection->use_tls = true;
if (aws_tls_connection_options_copy(&connection->tls_options, connection_options->tls_options)) {
AWS_LOGF_ERROR(
AWS_LS_MQTT_CLIENT, "id=%p: Failed to copy TLS Connection Options into connection", (void *)connection);
return AWS_OP_ERR;
}
if (!connection_options->tls_options->server_name) {
struct aws_byte_cursor host_name_cur = aws_byte_cursor_from_string(connection->host_name);
if (aws_tls_connection_options_set_server_name(
&connection->tls_options, connection->allocator, &host_name_cur)) {
AWS_LOGF_ERROR(
AWS_LS_MQTT_CLIENT, "id=%p: Failed to set TLS Connection Options server name", (void *)connection);
goto error;
}
}
} else {
AWS_ZERO_STRUCT(connection->tls_options);
}
/* Clean up old client_id */
if (connection->client_id.buffer) {
aws_byte_buf_clean_up(&connection->client_id);
}
/* Only set connection->client_id if a new one was provided */
struct aws_byte_buf client_id_buf =
aws_byte_buf_from_array(connection_options->client_id.ptr, connection_options->client_id.len);
if (aws_byte_buf_init_copy(&connection->client_id, connection->allocator, &client_id_buf)) {
AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Failed to copy client_id into connection", (void *)connection);
goto error;
}
struct aws_linked_list cancelling_requests;
aws_linked_list_init(&cancelling_requests);
{ /* BEGIN CRITICAL SECTION */
mqtt_connection_lock_synced_data(connection);
if (connection->clean_session) {
AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT,
"id=%p: a clean session connection requested, all the previous requests will fail",
(void *)connection);
aws_linked_list_swap_contents(&connection->synced_data.pending_requests_list, &cancelling_requests);
}
mqtt_connection_unlock_synced_data(connection);
} /* END CRITICAL SECTION */
if (!aws_linked_list_empty(&cancelling_requests)) {
struct aws_linked_list_node *current = aws_linked_list_front(&cancelling_requests);
const struct aws_linked_list_node *end = aws_linked_list_end(&cancelling_requests);
/* invoke all the complete callback for requests from previous session */
while (current != end) {
struct aws_mqtt_request *request = AWS_CONTAINER_OF(current, struct aws_mqtt_request, list_node);
AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT,
"id=%p: Establishing a new clean session connection, discard the previous request %" PRIu16,
(void *)connection,
request->packet_id);
if (request->on_complete) {
request->on_complete(
connection,
request->packet_id,
AWS_ERROR_MQTT_CANCELLED_FOR_CLEAN_SESSION,
request->on_complete_ud);
}
current = current->next;
}
/* free the resource */
{ /* BEGIN CRITICAL SECTION */
mqtt_connection_lock_synced_data(connection);
while (!aws_linked_list_empty(&cancelling_requests)) {
struct aws_linked_list_node *node = aws_linked_list_pop_front(&cancelling_requests);
struct aws_mqtt_request *request = AWS_CONTAINER_OF(node, struct aws_mqtt_request, list_node);
aws_hash_table_remove(
&connection->synced_data.outstanding_requests_table, &request->packet_id, NULL, NULL);
aws_memory_pool_release(&connection->synced_data.requests_pool, request);
}
mqtt_connection_unlock_synced_data(connection);
} /* END CRITICAL SECTION */
}
/* Begin the connecting process, acquire the connection to keep it alive until we disconnected */
aws_mqtt_client_connection_acquire(connection);
if (s_mqtt_client_connect(connection, connection_options->on_connection_complete, connection_options->user_data)) {
/*
* An error calling s_mqtt_client_connect should (must) be mutually exclusive with s_mqtt_client_shutdown().
* So it should be safe and correct to call release now to undo the pinning we did a few lines above.
*/
aws_mqtt_client_connection_release(connection);
/* client_id has been updated with something but it will get cleaned up when the connection gets cleaned up
* so we don't need to worry about it here*/
if (connection->clean_session) {
AWS_LOGF_WARN(
AWS_LS_MQTT_CLIENT, "id=%p: The previous session has been cleaned up and losted!", (void *)connection);
}
goto error;
}
return AWS_OP_SUCCESS;
error:
aws_tls_connection_options_clean_up(&connection->tls_options);
AWS_ZERO_STRUCT(connection->tls_options);
{ /* BEGIN CRITICAL SECTION */
mqtt_connection_lock_synced_data(connection);
mqtt_connection_set_state(connection, AWS_MQTT_CLIENT_STATE_DISCONNECTED);
mqtt_connection_unlock_synced_data(connection);
} /* END CRITICAL SECTION */
return AWS_OP_ERR;
}