aliyun-net-credentials/Provider/RefreshCachedSupplier.cs (299 lines of code) (raw):
using System;
using System.Threading;
using System.Threading.Tasks;
using Aliyun.Credentials.Exceptions;
using Aliyun.Credentials.Logging;
using Aliyun.Credentials.Policy;
using Aliyun.Credentials.Utils;
namespace Aliyun.Credentials.Provider
{
public class RefreshCachedSupplier<T>
{
private static readonly ILog Logger = LogProvider.For<RefreshCachedSupplier<T>>();
private const long StaleTime = 15 * 60 * 1000;
/// <summary>
/// Maximum time to wait for a blocking refresh lock before calling refresh again. Unit of milliseconds.
/// </summary>
private const int RefreshBlockingMaxWait = 5 * 1000;
private readonly object refreshLock = new object();
private readonly SemaphoreSlim refreshLockAsync = new SemaphoreSlim(1, 1);
private volatile RefreshResult<T> cachedValue;
private readonly Func<RefreshResult<T>> refreshFunc;
private readonly Func<Task<RefreshResult<T>>> refreshFuncAsync;
private volatile int consecutiveRefreshFailures;
private readonly StaleValueBehavior staleValueBehavior;
private readonly Random jitter = new Random();
private readonly IPrefetchStrategy prefetchStrategy;
public RefreshCachedSupplier(Func<RefreshResult<T>> refreshFunc,
Func<Task<RefreshResult<T>>> refreshFuncAsync)
{
this.refreshFunc =
ParameterHelper.ValidateNotNull(refreshFunc, "refreshFunc", "RefreshFunc must not be null.");
this.refreshFuncAsync = ParameterHelper.ValidateNotNull(refreshFuncAsync, "refreshFuncAsync",
"RefreshFuncAsync must not be null.");
this.staleValueBehavior = StaleValueBehavior.Strict;
this.prefetchStrategy = new OneCallerBlocks();
}
private RefreshCachedSupplier(Builder builder)
{
this.staleValueBehavior = ParameterHelper.ValidateNotNull(builder.staleValueBehavior,
"this.staleValueBehavior", "StaleValueBehavior is null.");
ParameterHelper.ValidateNotNull(builder.jitterEnabled, "jitterEnabled", "JitterEnabled is null.");
this.refreshFunc =
ParameterHelper.ValidateNotNull(builder.refreshFunc, "refreshFunc", "Refresh Function is null.");
this.refreshFuncAsync =
ParameterHelper.ValidateNotNull(builder.refreshFuncAsync, "refreshFuncAsync",
"RefreshAsync Function is null.");
this.prefetchStrategy = builder.asyncUpdateEnabled
? (IPrefetchStrategy)new NonBlocking()
: new OneCallerBlocks();
}
public T Get()
{
if (CacheIsStale())
{
Logger.Debug("Refreshing credentials synchronously");
RefreshCache();
}
else if (ShouldInitiateCachePrefetch())
{
Logger.Debug("Prefetching credentials, using prefetch strategy: {0}",
this.prefetchStrategy.ToString());
PrefetchCache();
}
return this.cachedValue.Value;
}
public async Task<T> GetAsync()
{
if (CacheIsStale())
{
Logger.Debug("Refreshing credentials synchronously");
await RefreshCacheAsync();
}
else if (ShouldInitiateCachePrefetch())
{
Logger.Debug("Prefetching credentials, using prefetch strategy: {0}",
this.prefetchStrategy.ToString());
await PrefetchCacheAsync();
}
return this.cachedValue.Value;
}
private void PrefetchCache()
{
this.prefetchStrategy.Prefetch(RefreshCache);
}
private async Task PrefetchCacheAsync()
{
await this.prefetchStrategy.PrefetchAsync(RefreshCacheAsync);
}
private void RefreshCache()
{
var lockTaken = false;
try
{
lockTaken = Monitor.TryEnter(this.refreshLock, RefreshBlockingMaxWait);
if (lockTaken && (CacheIsStale() || ShouldInitiateCachePrefetch()))
{
try
{
this.cachedValue = HandleFetchedSuccess(this.refreshFunc());
}
catch (Exception ex)
{
this.cachedValue = HandleFetchedFailure(ex);
}
}
}
catch (ThreadInterruptedException)
{
Thread.CurrentThread.Interrupt();
throw new InvalidOperationException("Interrupted waiting to refresh the value.");
}
catch (CredentialException)
{
throw;
}
catch (Exception ex)
{
throw new CredentialException("Failed to refresh credentials.", ex.Message);
}
finally
{
if (lockTaken)
{
Monitor.Exit(this.refreshLock);
}
}
}
private RefreshResult<T> HandleFetchedSuccess(RefreshResult<T> value)
{
Logger.Debug(string.Format("Refresh credentials successfully, retrieved value is {0}, cached value is {1}",
value, this.cachedValue));
Interlocked.Exchange(ref consecutiveRefreshFailures, 0);
var now = DateTime.UtcNow.GetTimeMillis();
// 过期时间大于15分钟,不用管
if (now < value.StaleTime)
{
Logger.Debug(string.Format("Retrieved value stale time is {0}. Using staleTime of {1}",
ParameterHelper.FormatIso8601Date(value.StaleTime),
ParameterHelper.FormatIso8601Date(value.StaleTime)));
return value;
}
// 不足或等于15分钟,但未过期,下次会再次刷新
if (now < value.StaleTime + StaleTime)
{
Logger.Warn(string.Format("Retrieved value stale time is %s in the past ({0}). Using staleTime of {1}",
ParameterHelper.FormatIso8601Date(value.StaleTime),
ParameterHelper.FormatIso8601Date(now)));
return value.ToBuilder().StaleTime(now).Build();
}
Logger.Warn(string.Format(
"Retrieved value expiration time of the credential is in the past ({0}). Trying use the cached value.",
ParameterHelper.FormatIso8601Date(value.StaleTime + StaleTime)));
// 已过期,看缓存,缓存若大于15分钟,返回缓存,若小于15分钟,则根据策略判断是立刻重试还是稍后重试
if (this.cachedValue == null)
{
throw new CredentialException("No cached value was found.");
}
if (now < this.cachedValue.StaleTime)
{
Logger.Warn(string.Format("Cached value staleTime is {0}. Using staleTime of {1}",
ParameterHelper.FormatIso8601Date(this.cachedValue.StaleTime),
ParameterHelper.FormatIso8601Date(this.cachedValue.StaleTime)));
return this.cachedValue;
}
switch (this.staleValueBehavior)
{
case StaleValueBehavior.Strict:
// 立马重试
Logger.Warn(string.Format(
"Cached value expiration is in the past ({0}). Using expiration of {1}",
value.StaleTime, now + 1000));
return this.cachedValue.ToBuilder().StaleTime(now + 1000).Build();
case StaleValueBehavior.Allow:
//一分钟左右重试一次
var waitUntilNextRefresh = 50 * 1000 + jitter.Next(20 * 1000 + 1);
var nextRefreshTime = now + waitUntilNextRefresh;
Logger.Warn(string.Format(
"Cached value expiration has been extended to {0} because the downstream service returned a time in the past: {1}",
nextRefreshTime, value.StaleTime));
return this.cachedValue.ToBuilder().StaleTime(nextRefreshTime).Build();
default:
throw new ArgumentException(string.Format("Unknown stale-value-behavior: {0}",
this.staleValueBehavior));
}
}
private RefreshResult<T> HandleFetchedFailure(Exception exception)
{
Logger.Warn(string.Format("Refresh credentials failed, cached value is {0}, exception is {1}", this.cachedValue, exception));
var currentCachedValue = this.cachedValue;
if (currentCachedValue == null)
{
Logger.Error(exception.Message);
throw exception;
}
var now = DateTime.UtcNow.GetTimeMillis();
if (now < currentCachedValue.StaleTime)
{
return currentCachedValue;
}
var numFailures = Interlocked.Increment(ref consecutiveRefreshFailures);
switch (this.staleValueBehavior)
{
case StaleValueBehavior.Strict:
throw exception;
case StaleValueBehavior.Allow:
// 采用退避算法,立刻重试
var newStaleTime = JitterTime(now, 1000, MaxStaleFailureJitter(numFailures));
Logger.Warn(string.Format(
"Cached value expiration has been extended to {0} because calling the downstream service failed (consecutive failures: {1}).",
newStaleTime, numFailures));
return currentCachedValue.ToBuilder().StaleTime(newStaleTime).Build();
default:
throw new ArgumentException(string.Format("Unknown stale-value-behavior: {0}",
this.staleValueBehavior));
}
}
private long JitterTime(long time, long jitterStart, long jitterEnd)
{
var jitterRange = jitterEnd - jitterStart;
var jitterAmount = Math.Abs(jitter.Next(0, (int)jitterRange));
return time + jitterStart + jitterAmount;
}
private static long MaxStaleFailureJitter(int numFailures)
{
var exponentialBackoffMillis = (1L << (numFailures - 1)) * 100;
return exponentialBackoffMillis > 10 * 1000 ? exponentialBackoffMillis : 10 * 1000;
}
private async Task RefreshCacheAsync()
{
var lockTaken = false;
try
{
lockTaken = await refreshLockAsync.WaitAsync(RefreshBlockingMaxWait);
if (lockTaken && (CacheIsStale() || ShouldInitiateCachePrefetch()))
{
try
{
this.cachedValue = HandleFetchedSuccess(await refreshFuncAsync());
}
catch (Exception ex)
{
this.cachedValue = HandleFetchedFailure(ex);
}
}
}
catch (ThreadInterruptedException)
{
Thread.CurrentThread.Interrupt();
throw new InvalidOperationException("Interrupted waiting to refresh the value.");
}
catch (CredentialException)
{
throw;
}
catch (Exception ex)
{
throw new CredentialException("Failed to refresh credentials.", ex.Message);
}
finally
{
if (lockTaken)
{
refreshLockAsync.Release();
}
}
}
private bool CacheIsStale()
{
return this.cachedValue == null || DateTime.UtcNow.GetTimeMillis() >= this.cachedValue.StaleTime;
}
private bool ShouldInitiateCachePrefetch()
{
return this.cachedValue == null || DateTime.UtcNow.GetTimeMillis() >= this.cachedValue.PrefetchTime;
}
public class Builder
{
internal readonly Func<RefreshResult<T>> refreshFunc;
internal readonly Func<Task<RefreshResult<T>>> refreshFuncAsync;
internal bool asyncUpdateEnabled;
internal bool jitterEnabled = true;
internal StaleValueBehavior staleValueBehavior = Policy.StaleValueBehavior.Strict;
internal Builder(Func<RefreshResult<T>> refreshFunc, Func<Task<RefreshResult<T>>> refreshFuncAsync)
{
this.refreshFunc = refreshFunc;
this.refreshFuncAsync = refreshFuncAsync;
}
public Builder AsyncUpdateEnabled(bool buildAsyncUpdateEnabled)
{
this.asyncUpdateEnabled = buildAsyncUpdateEnabled;
return this;
}
public Builder StaleValueBehavior(StaleValueBehavior buildStaleValueBehavior)
{
this.staleValueBehavior = buildStaleValueBehavior;
return this;
}
internal Builder JitterEnabled(bool jitterEnabledInBuilder)
{
this.jitterEnabled = jitterEnabledInBuilder;
return this;
}
public RefreshCachedSupplier<T> Build()
{
return new RefreshCachedSupplier<T>(this);
}
}
}
}