static void s_s3_client_prepare_callback_queue_request()

in source/s3_client.c [1130:1224]


static void s_s3_client_prepare_callback_queue_request(
    struct aws_s3_meta_request *meta_request,
    struct aws_s3_request *request,
    int error_code,
    void *user_data);

void aws_s3_client_update_meta_requests_threaded(struct aws_s3_client *client) {
    AWS_PRECONDITION(client);

    const uint32_t max_requests_in_flight = aws_s3_client_get_max_requests_in_flight(client);
    const uint32_t max_requests_prepare = aws_s3_client_get_max_requests_prepare(client);

    struct aws_linked_list meta_requests_work_remaining;
    aws_linked_list_init(&meta_requests_work_remaining);

    uint32_t num_requests_in_flight = (uint32_t)aws_atomic_load_int(&client->stats.num_requests_in_flight);

    const uint32_t pass_flags[] = {
        AWS_S3_META_REQUEST_UPDATE_FLAG_CONSERVATIVE,
        0,
    };

    const uint32_t num_passes = AWS_ARRAY_SIZE(pass_flags);

    for (uint32_t pass_index = 0; pass_index < num_passes; ++pass_index) {

        /* While:
         *     * Number of being-prepared + already-prepared-and-queued requests is less than the max that can be in the
         * preparation stage.
         *     * Total number of requests tracked by the client is less than the max tracked ("in flight") requests.
         *     * There are meta requests to get requests from.
         *
         * Then update meta requests to get new requests that can then be prepared (reading from any streams, signing,
         * etc.) for sending.
         */
        while ((client->threaded_data.num_requests_being_prepared + client->threaded_data.request_queue_size) <
                   max_requests_prepare &&
               num_requests_in_flight < max_requests_in_flight &&
               !aws_linked_list_empty(&client->threaded_data.meta_requests)) {

            struct aws_linked_list_node *meta_request_node =
                aws_linked_list_begin(&client->threaded_data.meta_requests);
            struct aws_s3_meta_request *meta_request =
                AWS_CONTAINER_OF(meta_request_node, struct aws_s3_meta_request, client_process_work_threaded_data);

            struct aws_s3_endpoint *endpoint = meta_request->endpoint;
            AWS_ASSERT(endpoint != NULL);

            AWS_ASSERT(client->vtable->get_host_address_count);
            size_t num_known_vips = client->vtable->get_host_address_count(
                client->client_bootstrap->host_resolver, endpoint->host_name, AWS_GET_HOST_ADDRESS_COUNT_RECORD_TYPE_A);

            /* If this particular endpoint doesn't have any known addresses yet, then we don't want to go full speed in
             * ramping up requests just yet. If there is already enough in the queue for one address (even if those
             * aren't for this particular endpoint) we skip over this meta request for now. */
            if (num_known_vips == 0 && (client->threaded_data.num_requests_being_prepared +
                                        client->threaded_data.request_queue_size) >= g_max_num_connections_per_vip) {
                aws_linked_list_remove(&meta_request->client_process_work_threaded_data.node);
                aws_linked_list_push_back(
                    &meta_requests_work_remaining, &meta_request->client_process_work_threaded_data.node);
                continue;
            }

            struct aws_s3_request *request = NULL;

            /* Try to grab the next request from the meta request. */
            bool work_remaining = aws_s3_meta_request_update(meta_request, pass_flags[pass_index], &request);

            if (work_remaining) {
                /* If there is work remaining, but we didn't get a request back, take the meta request out of the
                 * list so that we don't use it again during this function, with the intention of putting it back in
                 * the list before this function ends. */
                if (request == NULL) {
                    aws_linked_list_remove(&meta_request->client_process_work_threaded_data.node);
                    aws_linked_list_push_back(
                        &meta_requests_work_remaining, &meta_request->client_process_work_threaded_data.node);
                } else {
                    request->tracked_by_client = true;

                    ++client->threaded_data.num_requests_being_prepared;

                    num_requests_in_flight =
                        (uint32_t)aws_atomic_fetch_add(&client->stats.num_requests_in_flight, 1) + 1;

                    aws_s3_meta_request_prepare_request(
                        meta_request, request, s_s3_client_prepare_callback_queue_request, client);
                }
            } else {
                s_s3_client_remove_meta_request_threaded(client, meta_request);
            }
        }

        aws_linked_list_move_all_front(&client->threaded_data.meta_requests, &meta_requests_work_remaining);
    }
}