in source/client.c [192:384]
static void s_mqtt_client_shutdown(
struct aws_client_bootstrap *bootstrap,
int error_code,
struct aws_channel *channel,
void *user_data) {
(void)bootstrap;
(void)channel;
struct aws_mqtt_client_connection *connection = user_data;
AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT, "id=%p: Channel has been shutdown with error code %d", (void *)connection, error_code);
enum aws_mqtt_client_connection_state prev_state;
struct aws_linked_list cancelling_requests;
aws_linked_list_init(&cancelling_requests);
bool disconnected_state = false;
{ /* BEGIN CRITICAL SECTION */
mqtt_connection_lock_synced_data(connection);
/* Move all the ongoing requests to the pending requests list, because the response they are waiting for will
* never arrives. Sad. But, we will retry. */
if (connection->clean_session) {
/* For a clean session, the Session lasts as long as the Network Connection. Thus, discard the previous
* session */
AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT,
"id=%p: Discard ongoing requests and pending requests when a clean session connection lost.",
(void *)connection);
aws_linked_list_move_all_back(&cancelling_requests, &connection->thread_data.ongoing_requests_list);
aws_linked_list_move_all_back(&cancelling_requests, &connection->synced_data.pending_requests_list);
} else {
aws_linked_list_move_all_back(
&connection->synced_data.pending_requests_list, &connection->thread_data.ongoing_requests_list);
AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT,
"id=%p: All subscribe/unsubscribe and publish QoS>0 have been move to pending list",
(void *)connection);
}
prev_state = connection->synced_data.state;
switch (connection->synced_data.state) {
case AWS_MQTT_CLIENT_STATE_CONNECTED:
/* unexpected hangup from broker, try to reconnect */
mqtt_connection_set_state(connection, AWS_MQTT_CLIENT_STATE_RECONNECTING);
AWS_LOGF_DEBUG(
AWS_LS_MQTT_CLIENT,
"id=%p: connection was unexpected interrupted, switch state to RECONNECTING.",
(void *)connection);
break;
case AWS_MQTT_CLIENT_STATE_DISCONNECTING:
/* disconnect requested by user */
/* Successfully shutdown, so clear the outstanding requests */
/* TODO: respect the cleansession, clear the table when needed */
aws_hash_table_clear(&connection->synced_data.outstanding_requests_table);
disconnected_state = true;
AWS_LOGF_DEBUG(
AWS_LS_MQTT_CLIENT,
"id=%p: disconnect finished, switch state to DISCONNECTED.",
(void *)connection);
break;
case AWS_MQTT_CLIENT_STATE_CONNECTING:
/* failed to connect */
disconnected_state = true;
break;
case AWS_MQTT_CLIENT_STATE_RECONNECTING:
/* reconnect failed, schedule the next attempt later, no need to change the state. */
break;
default:
/* AWS_MQTT_CLIENT_STATE_DISCONNECTED */
break;
}
AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT, "id=%p: current state is %d", (void *)connection, (int)connection->synced_data.state);
/* Always clear slot, as that's what's been shutdown */
if (connection->slot) {
aws_channel_slot_remove(connection->slot);
AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: slot is removed successfully", (void *)connection);
connection->slot = NULL;
}
mqtt_connection_unlock_synced_data(connection);
} /* END CRITICAL SECTION */
if (!aws_linked_list_empty(&cancelling_requests)) {
struct aws_linked_list_node *current = aws_linked_list_front(&cancelling_requests);
const struct aws_linked_list_node *end = aws_linked_list_end(&cancelling_requests);
while (current != end) {
struct aws_mqtt_request *request = AWS_CONTAINER_OF(current, struct aws_mqtt_request, list_node);
if (request->on_complete) {
request->on_complete(
connection,
request->packet_id,
AWS_ERROR_MQTT_CANCELLED_FOR_CLEAN_SESSION,
request->on_complete_ud);
}
current = current->next;
}
{ /* BEGIN CRITICAL SECTION */
mqtt_connection_lock_synced_data(connection);
while (!aws_linked_list_empty(&cancelling_requests)) {
struct aws_linked_list_node *node = aws_linked_list_pop_front(&cancelling_requests);
struct aws_mqtt_request *request = AWS_CONTAINER_OF(node, struct aws_mqtt_request, list_node);
aws_hash_table_remove(
&connection->synced_data.outstanding_requests_table, &request->packet_id, NULL, NULL);
aws_memory_pool_release(&connection->synced_data.requests_pool, request);
}
mqtt_connection_unlock_synced_data(connection);
} /* END CRITICAL SECTION */
}
/* If there's no error code and this wasn't user-requested, set the error code to something useful */
if (error_code == AWS_ERROR_SUCCESS) {
if (prev_state != AWS_MQTT_CLIENT_STATE_DISCONNECTING && prev_state != AWS_MQTT_CLIENT_STATE_DISCONNECTED) {
error_code = AWS_ERROR_MQTT_UNEXPECTED_HANGUP;
}
}
switch (prev_state) {
case AWS_MQTT_CLIENT_STATE_RECONNECTING: {
/* If reconnect attempt failed, schedule the next attempt */
struct aws_event_loop *el =
aws_event_loop_group_get_next_loop(connection->client->bootstrap->event_loop_group);
AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: Reconnect failed, retrying", (void *)connection);
aws_event_loop_schedule_task_future(
el, &connection->reconnect_task->task, connection->reconnect_timeouts.next_attempt);
break;
}
case AWS_MQTT_CLIENT_STATE_CONNECTED: {
AWS_LOGF_DEBUG(
AWS_LS_MQTT_CLIENT,
"id=%p: Connection interrupted, calling callback and attempting reconnect",
(void *)connection);
MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_interrupted, error_code);
/* In case user called disconnect from the on_interrupted callback */
bool stop_reconnect;
{ /* BEGIN CRITICAL SECTION */
mqtt_connection_lock_synced_data(connection);
stop_reconnect = connection->synced_data.state == AWS_MQTT_CLIENT_STATE_DISCONNECTING;
if (stop_reconnect) {
disconnected_state = true;
AWS_LOGF_DEBUG(
AWS_LS_MQTT_CLIENT,
"id=%p: disconnect finished, switch state to DISCONNECTED.",
(void *)connection);
}
mqtt_connection_unlock_synced_data(connection);
} /* END CRITICAL SECTION */
if (!stop_reconnect) {
/* Attempt the reconnect immediately, which will schedule a task to retry if it doesn't succeed */
connection->reconnect_task->task.fn(
&connection->reconnect_task->task, connection->reconnect_task->task.arg, AWS_TASK_STATUS_RUN_READY);
}
break;
}
default:
break;
}
if (disconnected_state) {
{ /* BEGIN CRITICAL SECTION */
mqtt_connection_lock_synced_data(connection);
mqtt_connection_set_state(connection, AWS_MQTT_CLIENT_STATE_DISCONNECTED);
mqtt_connection_unlock_synced_data(connection);
} /* END CRITICAL SECTION */
switch (prev_state) {
case AWS_MQTT_CLIENT_STATE_CONNECTED:
AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT,
"id=%p: Caller requested disconnect from on_interrupted callback, aborting reconnect",
(void *)connection);
MQTT_CLIENT_CALL_CALLBACK(connection, on_disconnect);
break;
case AWS_MQTT_CLIENT_STATE_DISCONNECTING:
AWS_LOGF_DEBUG(
AWS_LS_MQTT_CLIENT,
"id=%p: Disconnect completed, clearing request queue and calling callback",
(void *)connection);
MQTT_CLIENT_CALL_CALLBACK(connection, on_disconnect);
break;
case AWS_MQTT_CLIENT_STATE_CONNECTING:
AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT,
"id=%p: Initial connection attempt failed, calling callback",
(void *)connection);
MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_connection_complete, error_code, 0, false);
break;
default:
break;
}
/* The connection can die now. Release the refcount */
aws_mqtt_client_connection_release(connection);
}
}