in source/s3_client.c [1130:1224]
static void s_s3_client_prepare_callback_queue_request(
struct aws_s3_meta_request *meta_request,
struct aws_s3_request *request,
int error_code,
void *user_data);
void aws_s3_client_update_meta_requests_threaded(struct aws_s3_client *client) {
AWS_PRECONDITION(client);
const uint32_t max_requests_in_flight = aws_s3_client_get_max_requests_in_flight(client);
const uint32_t max_requests_prepare = aws_s3_client_get_max_requests_prepare(client);
struct aws_linked_list meta_requests_work_remaining;
aws_linked_list_init(&meta_requests_work_remaining);
uint32_t num_requests_in_flight = (uint32_t)aws_atomic_load_int(&client->stats.num_requests_in_flight);
const uint32_t pass_flags[] = {
AWS_S3_META_REQUEST_UPDATE_FLAG_CONSERVATIVE,
0,
};
const uint32_t num_passes = AWS_ARRAY_SIZE(pass_flags);
for (uint32_t pass_index = 0; pass_index < num_passes; ++pass_index) {
/* While:
* * Number of being-prepared + already-prepared-and-queued requests is less than the max that can be in the
* preparation stage.
* * Total number of requests tracked by the client is less than the max tracked ("in flight") requests.
* * There are meta requests to get requests from.
*
* Then update meta requests to get new requests that can then be prepared (reading from any streams, signing,
* etc.) for sending.
*/
while ((client->threaded_data.num_requests_being_prepared + client->threaded_data.request_queue_size) <
max_requests_prepare &&
num_requests_in_flight < max_requests_in_flight &&
!aws_linked_list_empty(&client->threaded_data.meta_requests)) {
struct aws_linked_list_node *meta_request_node =
aws_linked_list_begin(&client->threaded_data.meta_requests);
struct aws_s3_meta_request *meta_request =
AWS_CONTAINER_OF(meta_request_node, struct aws_s3_meta_request, client_process_work_threaded_data);
struct aws_s3_endpoint *endpoint = meta_request->endpoint;
AWS_ASSERT(endpoint != NULL);
AWS_ASSERT(client->vtable->get_host_address_count);
size_t num_known_vips = client->vtable->get_host_address_count(
client->client_bootstrap->host_resolver, endpoint->host_name, AWS_GET_HOST_ADDRESS_COUNT_RECORD_TYPE_A);
/* If this particular endpoint doesn't have any known addresses yet, then we don't want to go full speed in
* ramping up requests just yet. If there is already enough in the queue for one address (even if those
* aren't for this particular endpoint) we skip over this meta request for now. */
if (num_known_vips == 0 && (client->threaded_data.num_requests_being_prepared +
client->threaded_data.request_queue_size) >= g_max_num_connections_per_vip) {
aws_linked_list_remove(&meta_request->client_process_work_threaded_data.node);
aws_linked_list_push_back(
&meta_requests_work_remaining, &meta_request->client_process_work_threaded_data.node);
continue;
}
struct aws_s3_request *request = NULL;
/* Try to grab the next request from the meta request. */
bool work_remaining = aws_s3_meta_request_update(meta_request, pass_flags[pass_index], &request);
if (work_remaining) {
/* If there is work remaining, but we didn't get a request back, take the meta request out of the
* list so that we don't use it again during this function, with the intention of putting it back in
* the list before this function ends. */
if (request == NULL) {
aws_linked_list_remove(&meta_request->client_process_work_threaded_data.node);
aws_linked_list_push_back(
&meta_requests_work_remaining, &meta_request->client_process_work_threaded_data.node);
} else {
request->tracked_by_client = true;
++client->threaded_data.num_requests_being_prepared;
num_requests_in_flight =
(uint32_t)aws_atomic_fetch_add(&client->stats.num_requests_in_flight, 1) + 1;
aws_s3_meta_request_prepare_request(
meta_request, request, s_s3_client_prepare_callback_queue_request, client);
}
} else {
s_s3_client_remove_meta_request_threaded(client, meta_request);
}
}
aws_linked_list_move_all_front(&client->threaded_data.meta_requests, &meta_requests_work_remaining);
}
}