in source/client_channel_handler.c [69:173]
static int s_packet_handler_connack(
struct aws_mqtt_client_connection *connection,
struct aws_byte_cursor message_cursor) {
AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: CONNACK received", (void *)connection);
struct aws_mqtt_packet_connack connack;
if (aws_mqtt_packet_connack_decode(&message_cursor, &connack)) {
AWS_LOGF_ERROR(
AWS_LS_MQTT_CLIENT, "id=%p: error %d parsing CONNACK packet", (void *)connection, aws_last_error());
return AWS_OP_ERR;
}
bool was_reconnecting;
struct aws_linked_list requests;
aws_linked_list_init(&requests);
{ /* BEGIN CRITICAL SECTION */
mqtt_connection_lock_synced_data(connection);
/* User requested disconnect, don't do anything */
if (connection->synced_data.state >= AWS_MQTT_CLIENT_STATE_DISCONNECTING) {
mqtt_connection_unlock_synced_data(connection);
AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT, "id=%p: User has requested disconnect, dropping connection", (void *)connection);
return AWS_OP_SUCCESS;
}
was_reconnecting = connection->synced_data.state == AWS_MQTT_CLIENT_STATE_RECONNECTING;
if (connack.connect_return_code == AWS_MQTT_CONNECT_ACCEPTED) {
AWS_LOGF_DEBUG(
AWS_LS_MQTT_CLIENT,
"id=%p: connection was accepted, switch state from %d to CONNECTED.",
(void *)connection,
(int)connection->synced_data.state);
/* Don't change the state if it's not ACCEPTED by broker */
mqtt_connection_set_state(connection, AWS_MQTT_CLIENT_STATE_CONNECTED);
aws_linked_list_swap_contents(&connection->synced_data.pending_requests_list, &requests);
}
mqtt_connection_unlock_synced_data(connection);
} /* END CRITICAL SECTION */
connection->connection_count++;
/* Reset the current timeout timer */
connection->reconnect_timeouts.current = connection->reconnect_timeouts.min;
if (connack.connect_return_code == AWS_MQTT_CONNECT_ACCEPTED) {
/* If successfully connected, schedule all pending tasks */
AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT, "id=%p: connection was accepted processing offline requests.", (void *)connection);
if (!aws_linked_list_empty(&requests)) {
struct aws_linked_list_node *current = aws_linked_list_front(&requests);
const struct aws_linked_list_node *end = aws_linked_list_end(&requests);
do {
struct aws_mqtt_request *request = AWS_CONTAINER_OF(current, struct aws_mqtt_request, list_node);
AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT,
"id=%p: processing offline request %" PRIu16,
(void *)connection,
request->packet_id);
aws_channel_schedule_task_now(connection->slot->channel, &request->outgoing_task);
current = current->next;
} while (current != end);
}
} else {
AWS_LOGF_ERROR(
AWS_LS_MQTT_CLIENT,
"id=%p: invalid connect return code %d, disconnecting",
(void *)connection,
connack.connect_return_code);
/* If error code returned, disconnect, on_completed will be invoked from shutdown process */
aws_channel_shutdown(connection->slot->channel, AWS_ERROR_MQTT_PROTOCOL_ERROR);
return AWS_OP_SUCCESS;
}
/* It is possible for a connection to complete, and a hangup to occur before the
* CONNECT/CONNACK cycle completes. In that case, we must deliver on_connection_complete
* on the first successful CONNACK or user code will never think it's connected */
if (was_reconnecting && connection->connection_count > 1) {
AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT,
"id=%p: connection is a resumed connection, invoking on_resumed callback",
(void *)connection);
MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_resumed, connack.connect_return_code, connack.session_present);
} else {
aws_create_reconnect_task(connection);
AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT,
"id=%p: connection is a new connection, invoking on_connection_complete callback",
(void *)connection);
MQTT_CLIENT_CALL_CALLBACK_ARGS(
connection, on_connection_complete, AWS_OP_SUCCESS, connack.connect_return_code, connack.session_present);
}
AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: connection callback completed", (void *)connection);
s_schedule_ping(connection);
return AWS_OP_SUCCESS;
}