in source/client_channel_handler.c [697:786]
static void s_request_outgoing_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
struct aws_mqtt_request *request = arg;
struct aws_mqtt_client_connection *connection = request->connection;
if (status == AWS_TASK_STATUS_CANCELED) {
/* Connection lost before the request ever get send, check the request needs to be retried or not */
if (request->retryable) {
AWS_LOGF_DEBUG(
AWS_LS_MQTT_CLIENT,
"static: task id %p, was canceled due to the channel shutting down. Request for packet id "
"%" PRIu16 ". will be retried",
(void *)task,
request->packet_id);
/* put it into the offline queue. */
{ /* BEGIN CRITICAL SECTION */
mqtt_connection_lock_synced_data(connection);
aws_linked_list_push_back(&connection->synced_data.pending_requests_list, &request->list_node);
mqtt_connection_unlock_synced_data(connection);
} /* END CRITICAL SECTION */
} else {
AWS_LOGF_DEBUG(
AWS_LS_MQTT_CLIENT,
"static: task id %p, was canceled due to the channel shutting down. Request for packet id "
"%" PRIu16 ". will NOT be retried, will be cancelled",
(void *)task,
request->packet_id);
/* Fire the callback and clean up the memory, as the connection get destroyed. */
if (request->on_complete) {
request->on_complete(
connection, request->packet_id, AWS_ERROR_MQTT_NOT_CONNECTED, request->on_complete_ud);
}
{ /* BEGIN CRITICAL SECTION */
mqtt_connection_lock_synced_data(connection);
aws_hash_table_remove(
&connection->synced_data.outstanding_requests_table, &request->packet_id, NULL, NULL);
aws_memory_pool_release(&connection->synced_data.requests_pool, request);
mqtt_connection_unlock_synced_data(connection);
} /* END CRITICAL SECTION */
}
return;
}
/* Send the request */
enum aws_mqtt_client_request_state state =
request->send_request(request->packet_id, !request->initiated, request->send_request_ud);
request->initiated = true;
int error_code = AWS_ERROR_SUCCESS;
switch (state) {
case AWS_MQTT_CLIENT_REQUEST_ERROR:
error_code = aws_last_error();
AWS_LOGF_ERROR(
AWS_LS_MQTT_CLIENT,
"id=%p: sending request %" PRIu16 " failed with error %d.",
(void *)request->connection,
request->packet_id,
error_code);
/* fall-thru */
case AWS_MQTT_CLIENT_REQUEST_COMPLETE:
AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT,
"id=%p: sending request %" PRIu16 " complete, invoking on_complete callback.",
(void *)request->connection,
request->packet_id);
/* If the send_request function reports the request is complete,
* remove from the hash table and call the callback. */
if (request->on_complete) {
request->on_complete(connection, request->packet_id, error_code, request->on_complete_ud);
}
{ /* BEGIN CRITICAL SECTION */
mqtt_connection_lock_synced_data(connection);
aws_hash_table_remove(
&connection->synced_data.outstanding_requests_table, &request->packet_id, NULL, NULL);
aws_memory_pool_release(&connection->synced_data.requests_pool, request);
mqtt_connection_unlock_synced_data(connection);
} /* END CRITICAL SECTION */
break;
case AWS_MQTT_CLIENT_REQUEST_ONGOING:
AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT,
"id=%p: request %" PRIu16 " sent, but waiting on an acknowledgement from peer.",
(void *)request->connection,
request->packet_id);
/* Put the request into the ongoing list */
aws_linked_list_push_back(&connection->thread_data.ongoing_requests_list, &request->list_node);
break;
}
}