code/KustoCopyConsole/Concurrency/AsyncCache.cs (61 lines of code) (raw):
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace KustoCopyConsole.Concurrency
{
internal class AsyncCache<T>
{
#region Inner Types
private record CacheNode(
TaskCompletionSource EnterSource,
TaskCompletionSource ExitSource,
bool IsItemAvailable,
T? Item,
DateTime ExpirationTime);
#endregion
private readonly Func<Task<(TimeSpan, T)>> _asyncFetchFunction;
private volatile CacheNode _cacheNode = new CacheNode(
new TaskCompletionSource(),
new TaskCompletionSource(),
false,
(T?)default,
DateTime.Now);
public AsyncCache(Func<Task<(TimeSpan, T)>> asyncFetchFunction)
{
_asyncFetchFunction = asyncFetchFunction;
}
public async Task<T> GetCacheItemAsync(CancellationToken ct)
{
while (true)
{
var cacheNode = _cacheNode;
if (cacheNode.IsItemAvailable && cacheNode.ExpirationTime > DateTime.Now)
{
return cacheNode.Item!;
}
else
{ // Try to "enter" the cache node: this might be a competition
if (cacheNode.EnterSource.TrySetResult())
{
var result = await _asyncFetchFunction();
var newCacheNode = new CacheNode(
new TaskCompletionSource(),
new TaskCompletionSource(),
true,
result.Item2,
DateTime.Now.Add(result.Item1));
Interlocked.Exchange(ref _cacheNode, newCacheNode);
cacheNode.ExitSource.SetResult();
return newCacheNode.Item!;
}
else
{ // Fail to enter the cache node: another thread will fetch the item
await cacheNode.ExitSource.Task;
}
}
}
}
}
}