in Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs [208:376]
public async Task<PartitionAddressInformation> TryGetAddressesAsync(
DocumentServiceRequest request,
PartitionKeyRangeIdentity partitionKeyRangeIdentity,
ServiceIdentity serviceIdentity,
bool forceRefreshPartitionAddresses,
CancellationToken cancellationToken)
{
if (request == null)
{
throw new ArgumentNullException(nameof(request));
}
if (partitionKeyRangeIdentity == null)
{
throw new ArgumentNullException(nameof(partitionKeyRangeIdentity));
}
try
{
if (partitionKeyRangeIdentity.PartitionKeyRangeId == PartitionKeyRange.MasterPartitionKeyRangeId)
{
return (await this.ResolveMasterAsync(request, forceRefreshPartitionAddresses)).Item2;
}
if (this.suboptimalServerPartitionTimestamps.TryGetValue(partitionKeyRangeIdentity, out DateTime suboptimalServerPartitionTimestamp))
{
bool forceRefreshDueToSuboptimalPartitionReplicaSet =
DateTime.UtcNow.Subtract(suboptimalServerPartitionTimestamp) > TimeSpan.FromSeconds(this.suboptimalPartitionForceRefreshIntervalInSeconds);
if (forceRefreshDueToSuboptimalPartitionReplicaSet && this.suboptimalServerPartitionTimestamps.TryUpdate(partitionKeyRangeIdentity, DateTime.MaxValue, suboptimalServerPartitionTimestamp))
{
forceRefreshPartitionAddresses = true;
}
}
PartitionAddressInformation addresses;
PartitionAddressInformation staleAddressInfo = null;
if (forceRefreshPartitionAddresses || request.ForceCollectionRoutingMapRefresh)
{
addresses = await this.serverPartitionAddressCache.GetAsync(
key: partitionKeyRangeIdentity,
singleValueInitFunc: (currentCachedValue) =>
{
staleAddressInfo = currentCachedValue;
GatewayAddressCache.SetTransportAddressUrisToUnhealthy(
currentCachedValue,
request?.RequestContext?.FailedEndpoints);
return this.GetAddressesForRangeIdAsync(
request,
cachedAddresses: currentCachedValue,
partitionKeyRangeIdentity.CollectionRid,
partitionKeyRangeIdentity.PartitionKeyRangeId,
forceRefresh: forceRefreshPartitionAddresses);
},
forceRefresh: (currentCachedValue) =>
{
int cachedHashCode = request?.RequestContext?.LastPartitionAddressInformationHashCode ?? 0;
if (cachedHashCode == 0)
{
return true;
}
// The cached value is different then the previous access hash then assume
// another request already updated the cache since there is a new value in the cache
return currentCachedValue.GetHashCode() == cachedHashCode;
});
if (staleAddressInfo != null)
{
GatewayAddressCache.LogPartitionCacheRefresh(request.RequestContext.ClientRequestStatistics, staleAddressInfo, addresses);
}
this.suboptimalServerPartitionTimestamps.TryRemove(partitionKeyRangeIdentity, out DateTime ignoreDateTime);
}
else
{
addresses = await this.serverPartitionAddressCache.GetAsync(
key: partitionKeyRangeIdentity,
singleValueInitFunc: (_) => this.GetAddressesForRangeIdAsync(
request,
cachedAddresses: null,
partitionKeyRangeIdentity.CollectionRid,
partitionKeyRangeIdentity.PartitionKeyRangeId,
forceRefresh: false),
forceRefresh: (_) => false);
}
// Always save the hash code. This is used to determine if another request already updated the cache.
// This helps reduce latency by avoiding uncessary cache refreshes.
if (request?.RequestContext != null)
{
request.RequestContext.LastPartitionAddressInformationHashCode = addresses.GetHashCode();
}
int targetReplicaSetSize = this.serviceConfigReader.UserReplicationPolicy.MaxReplicaSetSize;
if (addresses.AllAddresses.Count() < targetReplicaSetSize)
{
this.suboptimalServerPartitionTimestamps.TryAdd(partitionKeyRangeIdentity, DateTime.UtcNow);
}
// Refresh the cache on-demand, if there were some address that remained as unhealthy long enough (more than 1 minute)
// and need to revalidate its status. The reason it is not dependent on 410 to force refresh the addresses, is being:
// When an address is marked as unhealthy, then the address enumerator will deprioritize it and move it back to the
// end of the transport uris list. Therefore, it could happen that no request will land on the unhealthy address for
// an extended period of time therefore, the chances of 410 (Gone Exception) to trigger the forceRefresh workflow may
// not happen for that particular replica.
if (addresses
.Get(Protocol.Tcp)
.ReplicaTransportAddressUris
.Any(x => x.ShouldRefreshHealthStatus()))
{
bool slimAcquired = await this.semaphore.WaitAsync(0);
try
{
if (slimAcquired)
{
this.serverPartitionAddressCache.Refresh(
key: partitionKeyRangeIdentity,
singleValueInitFunc: (currentCachedValue) => this.GetAddressesForRangeIdAsync(
request,
cachedAddresses: currentCachedValue,
partitionKeyRangeIdentity.CollectionRid,
partitionKeyRangeIdentity.PartitionKeyRangeId,
forceRefresh: true));
}
else
{
DefaultTrace.TraceVerbose("Failed to refresh addresses in the background for the collection rid: {0}, partition key range id: {1}, because the semaphore is already acquired. '{2}'",
partitionKeyRangeIdentity.CollectionRid,
partitionKeyRangeIdentity.PartitionKeyRangeId,
System.Diagnostics.Trace.CorrelationManager.ActivityId);
}
}
finally
{
if (slimAcquired)
{
this.semaphore.Release();
}
}
}
return addresses;
}
catch (DocumentClientException ex)
{
if ((ex.StatusCode == HttpStatusCode.NotFound) ||
(ex.StatusCode == HttpStatusCode.Gone && ex.GetSubStatus() == SubStatusCodes.PartitionKeyRangeGone))
{
//remove from suboptimal cache in case the the collection+pKeyRangeId combo is gone.
this.suboptimalServerPartitionTimestamps.TryRemove(partitionKeyRangeIdentity, out _);
return null;
}
throw;
}
catch (Exception)
{
if (forceRefreshPartitionAddresses)
{
this.suboptimalServerPartitionTimestamps.TryRemove(partitionKeyRangeIdentity, out _);
}
throw;
}
}