static void s_s3_client_process_work_default()

in source/s3_client.c [950:1128]


static void s_s3_client_process_work_default(struct aws_s3_client *client) {
    AWS_PRECONDITION(client);
    AWS_PRECONDITION(client->vtable);
    AWS_PRECONDITION(client->vtable->finish_destroy);

    struct aws_linked_list meta_request_work_list;
    aws_linked_list_init(&meta_request_work_list);

    /*******************/
    /* Step 1: Move relevant data into thread local memory. */
    /*******************/
    AWS_LOGF_DEBUG(
        AWS_LS_S3_CLIENT,
        "id=%p s_s3_client_process_work_default - Moving relevant synced_data into threaded_data.",
        (void *)client);
    aws_s3_client_lock_synced_data(client);

    /* Once we exit this mutex, someone can reschedule this task. */
    client->synced_data.process_work_task_scheduled = false;
    client->synced_data.process_work_task_in_progress = true;

    aws_linked_list_swap_contents(&meta_request_work_list, &client->synced_data.pending_meta_request_work);

    uint32_t num_requests_queued =
        aws_s3_client_queue_requests_threaded(client, &client->synced_data.prepared_requests, false);

    {
        int sub_result = aws_sub_u32_checked(
            client->threaded_data.num_requests_being_prepared,
            num_requests_queued,
            &client->threaded_data.num_requests_being_prepared);

        AWS_ASSERT(sub_result == AWS_OP_SUCCESS);
        (void)sub_result;
    }

    {
        int sub_result = aws_sub_u32_checked(
            client->threaded_data.num_requests_being_prepared,
            client->synced_data.num_failed_prepare_requests,
            &client->threaded_data.num_requests_being_prepared);

        client->synced_data.num_failed_prepare_requests = 0;

        AWS_ASSERT(sub_result == AWS_OP_SUCCESS);
        (void)sub_result;
    }

    uint32_t num_endpoints_in_table = (uint32_t)aws_hash_table_get_entry_count(&client->synced_data.endpoints);
    uint32_t num_endpoints_allocated = client->synced_data.num_endpoints_allocated;

    aws_s3_client_unlock_synced_data(client);

    /*******************/
    /* Step 2: Push meta requests into the thread local list if they haven't already been scheduled. */
    /*******************/
    AWS_LOGF_DEBUG(
        AWS_LS_S3_CLIENT, "id=%p s_s3_client_process_work_default - Processing any new meta requests.", (void *)client);

    while (!aws_linked_list_empty(&meta_request_work_list)) {
        struct aws_linked_list_node *node = aws_linked_list_pop_back(&meta_request_work_list);
        struct aws_s3_meta_request_work *meta_request_work =
            AWS_CONTAINER_OF(node, struct aws_s3_meta_request_work, node);

        AWS_FATAL_ASSERT(meta_request_work != NULL);
        AWS_FATAL_ASSERT(meta_request_work->meta_request != NULL);

        struct aws_s3_meta_request *meta_request = meta_request_work->meta_request;

        if (!meta_request->client_process_work_threaded_data.scheduled) {
            aws_linked_list_push_back(
                &client->threaded_data.meta_requests, &meta_request->client_process_work_threaded_data.node);

            meta_request->client_process_work_threaded_data.scheduled = true;
        } else {
            aws_s3_meta_request_release(meta_request);
            meta_request = NULL;
        }

        aws_mem_release(client->allocator, meta_request_work);
    }

    /*******************/
    /* Step 3: Update relevant meta requests and connections. */
    /*******************/
    {
        AWS_LOGF_DEBUG(AWS_LS_S3_CLIENT, "id=%p Updating meta requests.", (void *)client);
        aws_s3_client_update_meta_requests_threaded(client);

        AWS_LOGF_DEBUG(
            AWS_LS_S3_CLIENT, "id=%p Updating connections, assigning requests where possible.", (void *)client);
        aws_s3_client_update_connections_threaded(client);
    }

    /*******************/
    /* Step 4: Log client stats. */
    /*******************/
    {
        uint32_t num_requests_tracked_requests = (uint32_t)aws_atomic_load_int(&client->stats.num_requests_in_flight);

        uint32_t num_auto_ranged_get_network_io =
            s_s3_client_get_num_requests_network_io(client, AWS_S3_META_REQUEST_TYPE_GET_OBJECT);
        uint32_t num_auto_ranged_put_network_io =
            s_s3_client_get_num_requests_network_io(client, AWS_S3_META_REQUEST_TYPE_PUT_OBJECT);
        uint32_t num_auto_default_network_io =
            s_s3_client_get_num_requests_network_io(client, AWS_S3_META_REQUEST_TYPE_DEFAULT);

        uint32_t num_requests_network_io =
            s_s3_client_get_num_requests_network_io(client, AWS_S3_META_REQUEST_TYPE_MAX);

        uint32_t num_requests_stream_queued_waiting =
            (uint32_t)aws_atomic_load_int(&client->stats.num_requests_stream_queued_waiting);
        uint32_t num_requests_streaming = (uint32_t)aws_atomic_load_int(&client->stats.num_requests_streaming);

        uint32_t total_approx_requests = num_requests_network_io + num_requests_stream_queued_waiting +
                                         num_requests_streaming + client->threaded_data.num_requests_being_prepared +
                                         client->threaded_data.request_queue_size;
        AWS_LOGF(
            s_log_level_client_stats,
            AWS_LS_S3_CLIENT_STATS,
            "id=%p Requests-in-flight(approx/exact):%d/%d  Requests-preparing:%d  Requests-queued:%d  "
            "Requests-network(get/put/default/total):%d/%d/%d/%d  Requests-streaming-waiting:%d  Requests-streaming:%d "
            " Endpoints(in-table/allocated):%d/%d",
            (void *)client,
            total_approx_requests,
            num_requests_tracked_requests,
            client->threaded_data.num_requests_being_prepared,
            client->threaded_data.request_queue_size,
            num_auto_ranged_get_network_io,
            num_auto_ranged_put_network_io,
            num_auto_default_network_io,
            num_requests_network_io,
            num_requests_stream_queued_waiting,
            num_requests_streaming,
            num_endpoints_in_table,
            num_endpoints_allocated);
    }

    /*******************/
    /* Step 5: Check for client shutdown. */
    /*******************/
    {
        aws_s3_client_lock_synced_data(client);
        client->synced_data.process_work_task_in_progress = false;

        /* This flag should never be set twice. If it was, that means a double-free could occur.*/
        AWS_ASSERT(!client->synced_data.finish_destroy);

        bool finish_destroy = client->synced_data.active == false &&
                              client->synced_data.start_destroy_executing == false &&
                              client->synced_data.body_streaming_elg_allocated == false &&
                              client->synced_data.process_work_task_scheduled == false &&
                              client->synced_data.process_work_task_in_progress == false &&
                              client->synced_data.num_endpoints_allocated == 0;

        client->synced_data.finish_destroy = finish_destroy;

        if (!client->synced_data.active) {
            AWS_LOGF_DEBUG(
                AWS_LS_S3_CLIENT,
                "id=%p Client shutdown progress: starting_destroy_executing=%d  body_streaming_elg_allocated=%d  "
                "process_work_task_scheduled=%d  process_work_task_in_progress=%d  num_endpoints_allocated=%d "
                "finish_destroy=%d",
                (void *)client,
                (int)client->synced_data.start_destroy_executing,
                (int)client->synced_data.body_streaming_elg_allocated,
                (int)client->synced_data.process_work_task_scheduled,
                (int)client->synced_data.process_work_task_in_progress,
                (int)client->synced_data.num_endpoints_allocated,
                (int)client->synced_data.finish_destroy);
        }

        aws_s3_client_unlock_synced_data(client);

        if (finish_destroy) {
            client->vtable->finish_destroy(client);
        }
    }
}