in source/standard_retry_strategy.c [130:274]
static int s_standard_retry_acquire_token(
struct aws_retry_strategy *retry_strategy,
const struct aws_byte_cursor *partition_id,
aws_retry_strategy_on_retry_token_acquired_fn *on_acquired,
void *user_data,
uint64_t timeout_ms) {
struct standard_strategy *standard_strategy = retry_strategy->impl;
bool bucket_needs_cleanup = false;
const struct aws_byte_cursor *partition_id_ptr =
!partition_id || partition_id->len == 0 ? &s_empty_string_cur : partition_id;
AWS_LOGF_DEBUG(
AWS_LS_IO_STANDARD_RETRY_STRATEGY,
"id=%p: attempting to acquire retry token for partition_id " PRInSTR,
(void *)retry_strategy,
AWS_BYTE_CURSOR_PRI(*partition_id_ptr));
struct retry_bucket_token *token = aws_mem_calloc(retry_strategy->allocator, 1, sizeof(struct retry_bucket_token));
if (!token) {
return AWS_OP_ERR;
}
token->original_user_data = user_data;
token->original_on_acquired = on_acquired;
struct aws_hash_element *element_ptr;
struct retry_bucket *bucket_ptr;
AWS_FATAL_ASSERT(!aws_mutex_lock(&standard_strategy->synced_data.lock) && "Lock acquisition failed.");
aws_hash_table_find(&standard_strategy->synced_data.token_buckets, partition_id_ptr, &element_ptr);
if (!element_ptr) {
AWS_LOGF_DEBUG(
AWS_LS_IO_STANDARD_RETRY_STRATEGY,
"id=%p: bucket for partition_id " PRInSTR " does not exist, attempting to create one",
(void *)retry_strategy,
AWS_BYTE_CURSOR_PRI(*partition_id_ptr));
bucket_ptr = aws_mem_calloc(standard_strategy->base.allocator, 1, sizeof(struct retry_bucket));
if (!bucket_ptr) {
AWS_LOGF_ERROR(
AWS_LS_IO_STANDARD_RETRY_STRATEGY,
"id=%p: error when allocating bucket %s",
(void *)retry_strategy,
aws_error_debug_str(aws_last_error()));
goto table_locked;
}
bucket_needs_cleanup = true;
bucket_ptr->allocator = standard_strategy->base.allocator;
bucket_ptr->partition_id = partition_id_ptr->len > 0
? aws_string_new_from_cursor(standard_strategy->base.allocator, partition_id)
: (struct aws_string *)s_empty_string;
if (!bucket_ptr->partition_id) {
AWS_LOGF_ERROR(
AWS_LS_IO_STANDARD_RETRY_STRATEGY,
"id=%p: error when allocating partition_id %s",
(void *)retry_strategy,
aws_error_debug_str(aws_last_error()));
goto table_locked;
}
bucket_ptr->partition_id_cur = aws_byte_cursor_from_string(bucket_ptr->partition_id);
AWS_FATAL_ASSERT(!aws_mutex_init(&bucket_ptr->synced_data.partition_lock) && "mutex init failed!");
bucket_ptr->owner = retry_strategy;
bucket_ptr->synced_data.current_capacity = standard_strategy->max_capacity;
AWS_LOGF_DEBUG(
AWS_LS_IO_STANDARD_RETRY_STRATEGY,
"id=%p: bucket %p for partition_id " PRInSTR " created",
(void *)retry_strategy,
(void *)bucket_ptr,
AWS_BYTE_CURSOR_PRI(*partition_id_ptr));
if (aws_hash_table_put(
&standard_strategy->synced_data.token_buckets, &bucket_ptr->partition_id_cur, bucket_ptr, NULL)) {
AWS_LOGF_ERROR(
AWS_LS_IO_STANDARD_RETRY_STRATEGY,
"id=%p: error when putting bucket to token_bucket table %s",
(void *)retry_strategy,
aws_error_debug_str(aws_last_error()));
goto table_locked;
}
bucket_needs_cleanup = false;
} else {
bucket_ptr = element_ptr->value;
AWS_LOGF_DEBUG(
AWS_LS_IO_STANDARD_RETRY_STRATEGY,
"id=%p: bucket %p for partition_id " PRInSTR " found",
(void *)retry_strategy,
(void *)bucket_ptr,
AWS_BYTE_CURSOR_PRI(*partition_id_ptr));
}
AWS_FATAL_ASSERT(!aws_mutex_unlock(&standard_strategy->synced_data.lock) && "Mutex unlock failed");
token->strategy_bucket = bucket_ptr;
token->retry_token.retry_strategy = retry_strategy;
aws_atomic_init_int(&token->retry_token.ref_count, 1u);
aws_retry_strategy_acquire(retry_strategy);
token->retry_token.allocator = retry_strategy->allocator;
token->retry_token.impl = token;
/* don't decrement the capacity counter, but add the retry payback, so making calls that succeed allows for a
* gradual recovery of the bucket capacity. Otherwise, we'd never recover from an outage. */
token->last_retry_cost = s_standard_no_retry_cost;
AWS_LOGF_TRACE(
AWS_LS_IO_STANDARD_RETRY_STRATEGY,
"id=%p: allocated token %p for partition_id " PRInSTR,
(void *)retry_strategy,
(void *)&token->retry_token,
AWS_BYTE_CURSOR_PRI(*partition_id_ptr));
if (aws_retry_strategy_acquire_retry_token(
standard_strategy->exponential_backoff_retry_strategy,
partition_id_ptr,
s_on_standard_retry_token_acquired,
token,
timeout_ms)) {
AWS_LOGF_ERROR(
AWS_LS_IO_STANDARD_RETRY_STRATEGY,
"id=%p: error when acquiring retry token from backing retry strategy %p: %s",
(void *)retry_strategy,
(void *)standard_strategy->exponential_backoff_retry_strategy,
aws_error_debug_str(aws_last_error()));
goto table_updated;
}
return AWS_OP_SUCCESS;
table_updated:
AWS_FATAL_ASSERT(!aws_mutex_lock(&standard_strategy->synced_data.lock) && "Mutex lock failed");
aws_hash_table_remove(&standard_strategy->synced_data.token_buckets, &bucket_ptr->partition_id_cur, NULL, NULL);
bucket_needs_cleanup = false;
table_locked:
AWS_FATAL_ASSERT(!aws_mutex_unlock(&standard_strategy->synced_data.lock) && "Mutex unlock failed");
if (bucket_needs_cleanup) {
s_destroy_standard_retry_bucket(bucket_ptr);
}
aws_retry_token_release(&token->retry_token);
return AWS_OP_ERR;
}