src/assets/Azure.Core.Shared/AsyncLockWithValue.cs (182 lines of code) (raw):
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
#nullable enable
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Threading.Tasks;
using Azure.Core.Pipeline;
namespace Azure.Core
{
/// <summary>
/// Primitive that combines async lock and value cache
/// </summary>
/// <typeparam name="T"></typeparam>
internal sealed class AsyncLockWithValue<T>
{
private readonly object _syncObj = new();
private Queue<TaskCompletionSource<LockOrValue>>? _waiters;
private bool _isLocked;
private bool _hasValue;
private long _index;
private T? _value;
public bool HasValue
{
get
{
lock (_syncObj)
{
return _hasValue;
}
}
}
public AsyncLockWithValue() { }
public AsyncLockWithValue(T value)
{
_hasValue = true;
_value = value;
}
public bool TryGetValue(out T? value)
{
lock (_syncObj)
{
if (_hasValue)
{
value = _value;
return true;
}
}
value = default;
return false;
}
/// <summary>
/// Method that either returns cached value or acquire a lock.
/// If one caller has acquired a lock, other callers will be waiting for the lock to be released.
/// If value is set, lock is released and all waiters get that value.
/// If value isn't set, the next waiter in the queue will get the lock.
/// </summary>
/// <param name="async"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async ValueTask<LockOrValue> GetLockOrValueAsync(bool async, CancellationToken cancellationToken = default)
{
TaskCompletionSource<LockOrValue> valueTcs;
lock (_syncObj)
{
// If there is a value, just return it
if (_hasValue)
{
return new LockOrValue(_value!);
}
// If lock isn't acquire yet, acquire it and return to the caller
if (!_isLocked)
{
_isLocked = true;
_index = unchecked(_index + 1);
return new LockOrValue(this, _index);
}
// Check cancellationToken before instantiating waiter
cancellationToken.ThrowIfCancellationRequested();
// If lock is already taken, create a waiter and wait either until value is set or lock can be acquired by this waiter
_waiters ??= new Queue<TaskCompletionSource<LockOrValue>>();
// if async == false, valueTcs will be waited only in this thread and only synchronously, so RunContinuationsAsynchronously isn't needed.
valueTcs = new TaskCompletionSource<LockOrValue>(async ? TaskCreationOptions.RunContinuationsAsynchronously : TaskCreationOptions.None);
_waiters.Enqueue(valueTcs);
}
try
{
if (async)
{
return await valueTcs.Task.AwaitWithCancellation(cancellationToken);
}
#pragma warning disable AZC0104 // Use EnsureCompleted() directly on asynchronous method return value.
#pragma warning disable AZC0111 // DO NOT use EnsureCompleted in possibly asynchronous scope.
valueTcs.Task.Wait(cancellationToken);
return valueTcs.Task.EnsureCompleted();
#pragma warning restore AZC0111 // DO NOT use EnsureCompleted in possibly asynchronous scope.
#pragma warning restore AZC0104 // Use EnsureCompleted() directly on asynchronous method return value.
}
catch (OperationCanceledException)
{
// Throw OperationCanceledException only if another thread hasn't set a value to this waiter
// by calling either Reset or SetValue
if (valueTcs.TrySetCanceled(cancellationToken))
{
throw;
}
return valueTcs.Task.Result;
}
}
/// <summary>
/// Set value to the cache and to all the waiters
/// </summary>
/// <param name="value"></param>
/// <param name="lockIndex"></param>
private void SetValue(T value, in long lockIndex)
{
Queue<TaskCompletionSource<LockOrValue>> waiters;
lock (_syncObj)
{
if (lockIndex != _index)
{
throw new InvalidOperationException($"Disposed {nameof(LockOrValue)} tries to set value. Current index: {_index}, {nameof(LockOrValue)} index: {lockIndex}");
}
_value = value;
_hasValue = true;
_index = 0;
_isLocked = false;
if (_waiters == default)
{
return;
}
waiters = _waiters;
_waiters = default;
}
while (waiters.Count > 0)
{
waiters.Dequeue().TrySetResult(new LockOrValue(value));
}
}
/// <summary>
/// Release the lock and allow next waiter acquire it
/// </summary>
private void Reset(in long lockIndex)
{
UnlockOrGetNextWaiter(lockIndex, out var nextWaiter);
while (nextWaiter != default && !nextWaiter.TrySetResult(new LockOrValue(this, unchecked(lockIndex + 1))))
{
UnlockOrGetNextWaiter(lockIndex, out nextWaiter);
}
}
private void UnlockOrGetNextWaiter(in long lockIndex, out TaskCompletionSource<LockOrValue>? nextWaiter)
{
lock (_syncObj)
{
nextWaiter = default;
// If lock isn't acquired, just return
if (!_isLocked || lockIndex != _index)
{
return;
}
_index = unchecked(lockIndex + 1);
// If lock was acquired, but there are no waiters, unlock and return
if (_waiters == default)
{
_isLocked = false;
return;
}
// Find the next waiter
while (_waiters.Count > 0)
{
nextWaiter = _waiters.Dequeue();
if (!nextWaiter.Task.IsCompleted)
{
// Return the waiter only if it wasn't canceled already
return;
}
}
// If no next waiter has been found, unlock and return
_isLocked = false;
}
}
public readonly struct LockOrValue : IDisposable
{
private readonly AsyncLockWithValue<T>? _owner;
private readonly T? _value;
private readonly long _index;
/// <summary>
/// Returns true if lock contains the cached value. Otherwise false.
/// </summary>
public bool HasValue => _owner == default;
/// <summary>
/// Returns cached value if it was set when lock has been created. Throws exception otherwise.
/// </summary>
/// <exception cref="InvalidOperationException">Value isn't set.</exception>
public T Value => HasValue ? _value! : throw new InvalidOperationException("Value isn't set");
public LockOrValue(T value)
{
_owner = default;
_value = value;
_index = 0;
}
public LockOrValue(AsyncLockWithValue<T> owner, long index)
{
_owner = owner;
_index = index;
_value = default;
}
/// <summary>
/// Set value to the cache and to all the waiters.
/// </summary>
/// <param name="value"></param>
/// <exception cref="InvalidOperationException">Value is set already.</exception>
public void SetValue(T value)
{
if (_owner != null)
{
_owner.SetValue(value, _index);
}
else
{
throw new InvalidOperationException("Value for the lock is set already");
}
}
public void Dispose() => _owner?.Reset(_index);
}
}
}