in source/s3_client.c [950:1128]
static void s_s3_client_process_work_default(struct aws_s3_client *client) {
AWS_PRECONDITION(client);
AWS_PRECONDITION(client->vtable);
AWS_PRECONDITION(client->vtable->finish_destroy);
struct aws_linked_list meta_request_work_list;
aws_linked_list_init(&meta_request_work_list);
/*******************/
/* Step 1: Move relevant data into thread local memory. */
/*******************/
AWS_LOGF_DEBUG(
AWS_LS_S3_CLIENT,
"id=%p s_s3_client_process_work_default - Moving relevant synced_data into threaded_data.",
(void *)client);
aws_s3_client_lock_synced_data(client);
/* Once we exit this mutex, someone can reschedule this task. */
client->synced_data.process_work_task_scheduled = false;
client->synced_data.process_work_task_in_progress = true;
aws_linked_list_swap_contents(&meta_request_work_list, &client->synced_data.pending_meta_request_work);
uint32_t num_requests_queued =
aws_s3_client_queue_requests_threaded(client, &client->synced_data.prepared_requests, false);
{
int sub_result = aws_sub_u32_checked(
client->threaded_data.num_requests_being_prepared,
num_requests_queued,
&client->threaded_data.num_requests_being_prepared);
AWS_ASSERT(sub_result == AWS_OP_SUCCESS);
(void)sub_result;
}
{
int sub_result = aws_sub_u32_checked(
client->threaded_data.num_requests_being_prepared,
client->synced_data.num_failed_prepare_requests,
&client->threaded_data.num_requests_being_prepared);
client->synced_data.num_failed_prepare_requests = 0;
AWS_ASSERT(sub_result == AWS_OP_SUCCESS);
(void)sub_result;
}
uint32_t num_endpoints_in_table = (uint32_t)aws_hash_table_get_entry_count(&client->synced_data.endpoints);
uint32_t num_endpoints_allocated = client->synced_data.num_endpoints_allocated;
aws_s3_client_unlock_synced_data(client);
/*******************/
/* Step 2: Push meta requests into the thread local list if they haven't already been scheduled. */
/*******************/
AWS_LOGF_DEBUG(
AWS_LS_S3_CLIENT, "id=%p s_s3_client_process_work_default - Processing any new meta requests.", (void *)client);
while (!aws_linked_list_empty(&meta_request_work_list)) {
struct aws_linked_list_node *node = aws_linked_list_pop_back(&meta_request_work_list);
struct aws_s3_meta_request_work *meta_request_work =
AWS_CONTAINER_OF(node, struct aws_s3_meta_request_work, node);
AWS_FATAL_ASSERT(meta_request_work != NULL);
AWS_FATAL_ASSERT(meta_request_work->meta_request != NULL);
struct aws_s3_meta_request *meta_request = meta_request_work->meta_request;
if (!meta_request->client_process_work_threaded_data.scheduled) {
aws_linked_list_push_back(
&client->threaded_data.meta_requests, &meta_request->client_process_work_threaded_data.node);
meta_request->client_process_work_threaded_data.scheduled = true;
} else {
aws_s3_meta_request_release(meta_request);
meta_request = NULL;
}
aws_mem_release(client->allocator, meta_request_work);
}
/*******************/
/* Step 3: Update relevant meta requests and connections. */
/*******************/
{
AWS_LOGF_DEBUG(AWS_LS_S3_CLIENT, "id=%p Updating meta requests.", (void *)client);
aws_s3_client_update_meta_requests_threaded(client);
AWS_LOGF_DEBUG(
AWS_LS_S3_CLIENT, "id=%p Updating connections, assigning requests where possible.", (void *)client);
aws_s3_client_update_connections_threaded(client);
}
/*******************/
/* Step 4: Log client stats. */
/*******************/
{
uint32_t num_requests_tracked_requests = (uint32_t)aws_atomic_load_int(&client->stats.num_requests_in_flight);
uint32_t num_auto_ranged_get_network_io =
s_s3_client_get_num_requests_network_io(client, AWS_S3_META_REQUEST_TYPE_GET_OBJECT);
uint32_t num_auto_ranged_put_network_io =
s_s3_client_get_num_requests_network_io(client, AWS_S3_META_REQUEST_TYPE_PUT_OBJECT);
uint32_t num_auto_default_network_io =
s_s3_client_get_num_requests_network_io(client, AWS_S3_META_REQUEST_TYPE_DEFAULT);
uint32_t num_requests_network_io =
s_s3_client_get_num_requests_network_io(client, AWS_S3_META_REQUEST_TYPE_MAX);
uint32_t num_requests_stream_queued_waiting =
(uint32_t)aws_atomic_load_int(&client->stats.num_requests_stream_queued_waiting);
uint32_t num_requests_streaming = (uint32_t)aws_atomic_load_int(&client->stats.num_requests_streaming);
uint32_t total_approx_requests = num_requests_network_io + num_requests_stream_queued_waiting +
num_requests_streaming + client->threaded_data.num_requests_being_prepared +
client->threaded_data.request_queue_size;
AWS_LOGF(
s_log_level_client_stats,
AWS_LS_S3_CLIENT_STATS,
"id=%p Requests-in-flight(approx/exact):%d/%d Requests-preparing:%d Requests-queued:%d "
"Requests-network(get/put/default/total):%d/%d/%d/%d Requests-streaming-waiting:%d Requests-streaming:%d "
" Endpoints(in-table/allocated):%d/%d",
(void *)client,
total_approx_requests,
num_requests_tracked_requests,
client->threaded_data.num_requests_being_prepared,
client->threaded_data.request_queue_size,
num_auto_ranged_get_network_io,
num_auto_ranged_put_network_io,
num_auto_default_network_io,
num_requests_network_io,
num_requests_stream_queued_waiting,
num_requests_streaming,
num_endpoints_in_table,
num_endpoints_allocated);
}
/*******************/
/* Step 5: Check for client shutdown. */
/*******************/
{
aws_s3_client_lock_synced_data(client);
client->synced_data.process_work_task_in_progress = false;
/* This flag should never be set twice. If it was, that means a double-free could occur.*/
AWS_ASSERT(!client->synced_data.finish_destroy);
bool finish_destroy = client->synced_data.active == false &&
client->synced_data.start_destroy_executing == false &&
client->synced_data.body_streaming_elg_allocated == false &&
client->synced_data.process_work_task_scheduled == false &&
client->synced_data.process_work_task_in_progress == false &&
client->synced_data.num_endpoints_allocated == 0;
client->synced_data.finish_destroy = finish_destroy;
if (!client->synced_data.active) {
AWS_LOGF_DEBUG(
AWS_LS_S3_CLIENT,
"id=%p Client shutdown progress: starting_destroy_executing=%d body_streaming_elg_allocated=%d "
"process_work_task_scheduled=%d process_work_task_in_progress=%d num_endpoints_allocated=%d "
"finish_destroy=%d",
(void *)client,
(int)client->synced_data.start_destroy_executing,
(int)client->synced_data.body_streaming_elg_allocated,
(int)client->synced_data.process_work_task_scheduled,
(int)client->synced_data.process_work_task_in_progress,
(int)client->synced_data.num_endpoints_allocated,
(int)client->synced_data.finish_destroy);
}
aws_s3_client_unlock_synced_data(client);
if (finish_destroy) {
client->vtable->finish_destroy(client);
}
}
}