static int s_standard_retry_acquire_token()

in source/standard_retry_strategy.c [130:274]


static int s_standard_retry_acquire_token(
    struct aws_retry_strategy *retry_strategy,
    const struct aws_byte_cursor *partition_id,
    aws_retry_strategy_on_retry_token_acquired_fn *on_acquired,
    void *user_data,
    uint64_t timeout_ms) {
    struct standard_strategy *standard_strategy = retry_strategy->impl;
    bool bucket_needs_cleanup = false;

    const struct aws_byte_cursor *partition_id_ptr =
        !partition_id || partition_id->len == 0 ? &s_empty_string_cur : partition_id;

    AWS_LOGF_DEBUG(
        AWS_LS_IO_STANDARD_RETRY_STRATEGY,
        "id=%p: attempting to acquire retry token for partition_id " PRInSTR,
        (void *)retry_strategy,
        AWS_BYTE_CURSOR_PRI(*partition_id_ptr));

    struct retry_bucket_token *token = aws_mem_calloc(retry_strategy->allocator, 1, sizeof(struct retry_bucket_token));
    if (!token) {
        return AWS_OP_ERR;
    }

    token->original_user_data = user_data;
    token->original_on_acquired = on_acquired;

    struct aws_hash_element *element_ptr;
    struct retry_bucket *bucket_ptr;
    AWS_FATAL_ASSERT(!aws_mutex_lock(&standard_strategy->synced_data.lock) && "Lock acquisition failed.");
    aws_hash_table_find(&standard_strategy->synced_data.token_buckets, partition_id_ptr, &element_ptr);
    if (!element_ptr) {
        AWS_LOGF_DEBUG(
            AWS_LS_IO_STANDARD_RETRY_STRATEGY,
            "id=%p: bucket for partition_id " PRInSTR " does not exist, attempting to create one",
            (void *)retry_strategy,
            AWS_BYTE_CURSOR_PRI(*partition_id_ptr));
        bucket_ptr = aws_mem_calloc(standard_strategy->base.allocator, 1, sizeof(struct retry_bucket));

        if (!bucket_ptr) {
            AWS_LOGF_ERROR(
                AWS_LS_IO_STANDARD_RETRY_STRATEGY,
                "id=%p: error when allocating bucket %s",
                (void *)retry_strategy,
                aws_error_debug_str(aws_last_error()));
            goto table_locked;
        }

        bucket_needs_cleanup = true;
        bucket_ptr->allocator = standard_strategy->base.allocator;
        bucket_ptr->partition_id = partition_id_ptr->len > 0
                                       ? aws_string_new_from_cursor(standard_strategy->base.allocator, partition_id)
                                       : (struct aws_string *)s_empty_string;

        if (!bucket_ptr->partition_id) {
            AWS_LOGF_ERROR(
                AWS_LS_IO_STANDARD_RETRY_STRATEGY,
                "id=%p: error when allocating partition_id %s",
                (void *)retry_strategy,
                aws_error_debug_str(aws_last_error()));
            goto table_locked;
        }

        bucket_ptr->partition_id_cur = aws_byte_cursor_from_string(bucket_ptr->partition_id);
        AWS_FATAL_ASSERT(!aws_mutex_init(&bucket_ptr->synced_data.partition_lock) && "mutex init failed!");
        bucket_ptr->owner = retry_strategy;
        bucket_ptr->synced_data.current_capacity = standard_strategy->max_capacity;
        AWS_LOGF_DEBUG(
            AWS_LS_IO_STANDARD_RETRY_STRATEGY,
            "id=%p: bucket %p for partition_id " PRInSTR " created",
            (void *)retry_strategy,
            (void *)bucket_ptr,
            AWS_BYTE_CURSOR_PRI(*partition_id_ptr));

        if (aws_hash_table_put(
                &standard_strategy->synced_data.token_buckets, &bucket_ptr->partition_id_cur, bucket_ptr, NULL)) {
            AWS_LOGF_ERROR(
                AWS_LS_IO_STANDARD_RETRY_STRATEGY,
                "id=%p: error when putting bucket to token_bucket table %s",
                (void *)retry_strategy,
                aws_error_debug_str(aws_last_error()));
            goto table_locked;
        }
        bucket_needs_cleanup = false;
    } else {
        bucket_ptr = element_ptr->value;
        AWS_LOGF_DEBUG(
            AWS_LS_IO_STANDARD_RETRY_STRATEGY,
            "id=%p: bucket %p for partition_id " PRInSTR " found",
            (void *)retry_strategy,
            (void *)bucket_ptr,
            AWS_BYTE_CURSOR_PRI(*partition_id_ptr));
    }
    AWS_FATAL_ASSERT(!aws_mutex_unlock(&standard_strategy->synced_data.lock) && "Mutex unlock failed");

    token->strategy_bucket = bucket_ptr;
    token->retry_token.retry_strategy = retry_strategy;
    aws_atomic_init_int(&token->retry_token.ref_count, 1u);
    aws_retry_strategy_acquire(retry_strategy);
    token->retry_token.allocator = retry_strategy->allocator;
    token->retry_token.impl = token;

    /* don't decrement the capacity counter, but add the retry payback, so making calls that succeed allows for a
     * gradual recovery of the bucket capacity. Otherwise, we'd never recover from an outage. */
    token->last_retry_cost = s_standard_no_retry_cost;

    AWS_LOGF_TRACE(
        AWS_LS_IO_STANDARD_RETRY_STRATEGY,
        "id=%p: allocated token %p for partition_id " PRInSTR,
        (void *)retry_strategy,
        (void *)&token->retry_token,
        AWS_BYTE_CURSOR_PRI(*partition_id_ptr));

    if (aws_retry_strategy_acquire_retry_token(
            standard_strategy->exponential_backoff_retry_strategy,
            partition_id_ptr,
            s_on_standard_retry_token_acquired,
            token,
            timeout_ms)) {
        AWS_LOGF_ERROR(
            AWS_LS_IO_STANDARD_RETRY_STRATEGY,
            "id=%p: error when acquiring retry token from backing retry strategy %p: %s",
            (void *)retry_strategy,
            (void *)standard_strategy->exponential_backoff_retry_strategy,
            aws_error_debug_str(aws_last_error()));
        goto table_updated;
    }

    return AWS_OP_SUCCESS;

table_updated:
    AWS_FATAL_ASSERT(!aws_mutex_lock(&standard_strategy->synced_data.lock) && "Mutex lock failed");
    aws_hash_table_remove(&standard_strategy->synced_data.token_buckets, &bucket_ptr->partition_id_cur, NULL, NULL);
    bucket_needs_cleanup = false;

table_locked:
    AWS_FATAL_ASSERT(!aws_mutex_unlock(&standard_strategy->synced_data.lock) && "Mutex unlock failed");

    if (bucket_needs_cleanup) {
        s_destroy_standard_retry_bucket(bucket_ptr);
    }

    aws_retry_token_release(&token->retry_token);

    return AWS_OP_ERR;
}