in src/Microsoft.ServiceFabric.Services/Communication/Client/CommunicationClientFactoryBase.cs [363:505]
protected abstract void AbortClient(
TCommunicationClient client);
private async Task<TCommunicationClient> CreateClientWithRetriesAsync(
ResolvedServicePartition previousRsp,
TargetReplicaSelector targetReplicaSelector,
string listenerName,
OperationRetrySettings retrySettings,
bool doInitialResolve,
CancellationToken cancellationToken)
{
var doResolve = doInitialResolve;
var exceptionRetryCount = 0;
var currentRetryCount = 0;
string requestId;
if (!ClientRequestTracker.TryGet(out requestId))
{
requestId = Guid.NewGuid().ToString();
}
string currentExceptionId = null;
while (true)
{
ExceptionHandlingResult result;
Exception actualException;
var newClient = false;
bool isValid = true;
try
{
if (doResolve)
{
var rsp = await this.ServiceResolver.ResolveAsync(
previousRsp,
ServicePartitionResolver.DefaultResolveTimeout,
defaultDelay,
cancellationToken);
previousRsp = rsp;
}
var endpoint = this.GetEndpoint(previousRsp, targetReplicaSelector);
var cacheEntry =
await
this.GetAndLockClientCacheEntryAsync(
previousRsp.Info.Id,
endpoint,
listenerName,
previousRsp,
cancellationToken);
var client = default(TCommunicationClient);
try
{
// The communication client in the cache is invalid.
// This could happen for these 2 cases,
// 1. The endpoint and RSP information is valid, but there are no active users for the
// communication client so the last reference to the client was GC'd.
// 2. There was an exception during communication to the endpoint, and the ReportOperationException
// code path and the communication client was invalidated.
if (this.ShouldCreateNewClient(cacheEntry))
{
client = await this.CreateNewClientAsync(listenerName, requestId, previousRsp, cacheEntry, cancellationToken);
newClient = true;
}
else
{
isValid = this.ValidateClientCacheEntry(cacheEntry, previousRsp, requestId, endpoint, listenerName, currentRetryCount, out client);
}
}
finally
{
cacheEntry.Semaphore.Release();
}
if (!isValid)
{
doResolve = true;
var retryparemters = new RetryDelayParameters(currentRetryCount++, false);
var requestRetryDelay = retrySettings.RetryPolicy.GetNextRetryDelay(retryparemters);
await Task.Delay(requestRetryDelay, cancellationToken);
continue;
}
if (client != null && newClient && this.fireConnectEvents)
{
this.OnClientConnected(client);
}
return client;
}
catch (Exception e)
{
ServiceTrace.Source.WriteInfoWithId(
TraceType,
requestId,
"{0} Exception While CreatingClient {1}",
this.traceId,
e);
if (!this.HandleReportedException(
new ExceptionInformation(e, targetReplicaSelector),
retrySettings,
out result))
{
throw;
}
if (result is ExceptionHandlingThrowResult throwResult)
{
if (ReferenceEquals(e, throwResult.ExceptionToThrow))
{
throw;
}
throw throwResult.ExceptionToThrow;
}
// capture the exception so that we can throw based on the retry policy
actualException = e;
}
var retryResult = (ExceptionHandlingRetryResult)result;
if (!Utility.ShouldRetryOperation(
retryResult.ExceptionId,
retryResult.MaxRetryCount,
ref currentExceptionId,
ref exceptionRetryCount))
{
ServiceTrace.Source.WriteInfoWithId(
TraceType,
requestId,
"{0} Retry count for exception id {1} exceeded the retry limit : {2}, throwing exception - {3}",
this.traceId,
retryResult.ExceptionId,
retryResult.MaxRetryCount,
actualException);
throw new AggregateException(actualException);
}
doResolve = !retryResult.IsTransient;
var retryDelay = retryResult.GetRetryDelay(currentRetryCount++);
await Task.Delay(retryDelay, cancellationToken);
}
}