Microsoft.Azure.Cosmos/src/Routing/PartitionKeyRangeCache.cs (272 lines of code) (raw):

//------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //------------------------------------------------------------ namespace Microsoft.Azure.Cosmos.Routing { using System; using System.Collections.Generic; using System.Diagnostics; using System.Globalization; using System.Linq; using System.Net; using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.Common; using Microsoft.Azure.Cosmos.Core.Trace; using Microsoft.Azure.Cosmos.Tracing; using Microsoft.Azure.Cosmos.Tracing.TraceData; using Microsoft.Azure.Documents; using Microsoft.Azure.Documents.Collections; using Microsoft.Azure.Documents.Routing; internal class PartitionKeyRangeCache : IRoutingMapProvider, ICollectionRoutingMapCache { private const string PageSizeString = "-1"; private readonly AsyncCacheNonBlocking<string, CollectionRoutingMap> routingMapCache; private readonly ICosmosAuthorizationTokenProvider authorizationTokenProvider; private readonly IStoreModel storeModel; private readonly CollectionCache collectionCache; private readonly IGlobalEndpointManager endpointManager; public PartitionKeyRangeCache( ICosmosAuthorizationTokenProvider authorizationTokenProvider, IStoreModel storeModel, CollectionCache collectionCache, IGlobalEndpointManager endpointManager, bool enableAsyncCacheExceptionNoSharing = true) { this.routingMapCache = new AsyncCacheNonBlocking<string, CollectionRoutingMap>( keyEqualityComparer: StringComparer.Ordinal, enableAsyncCacheExceptionNoSharing: enableAsyncCacheExceptionNoSharing); this.authorizationTokenProvider = authorizationTokenProvider; this.storeModel = storeModel; this.collectionCache = collectionCache; this.endpointManager = endpointManager; } public virtual async Task<IReadOnlyList<PartitionKeyRange>> TryGetOverlappingRangesAsync( string collectionRid, Range<string> range, ITrace trace, bool forceRefresh = false) { using (ITrace childTrace = trace.StartChild("Try Get Overlapping Ranges", TraceComponent.Routing, Tracing.TraceLevel.Info)) { Debug.Assert(ResourceId.TryParse(collectionRid, out ResourceId collectionRidParsed), "Could not parse CollectionRid from ResourceId."); CollectionRoutingMap routingMap = await this.TryLookupAsync( collectionRid: collectionRid, previousValue: null, request: null, trace: childTrace); if (forceRefresh && routingMap != null) { routingMap = await this.TryLookupAsync( collectionRid: collectionRid, previousValue: routingMap, request: null, trace: childTrace); } if (routingMap == null) { DefaultTrace.TraceWarning(string.Format("Routing Map Null for collection: {0} for range: {1}, forceRefresh:{2}", collectionRid, range.ToString(), forceRefresh)); return null; } return routingMap.GetOverlappingRanges(range); } } public virtual async Task<PartitionKeyRange> TryGetPartitionKeyRangeByIdAsync( string collectionResourceId, string partitionKeyRangeId, ITrace trace, bool forceRefresh = false) { Debug.Assert(ResourceId.TryParse(collectionResourceId, out _), "Could not parse CollectionRid from ResourceId."); CollectionRoutingMap routingMap = await this.TryLookupAsync( collectionRid: collectionResourceId, previousValue: null, request: null, trace: trace); if (forceRefresh && routingMap != null) { routingMap = await this.TryLookupAsync( collectionRid: collectionResourceId, previousValue: routingMap, request: null, trace: trace); } if (routingMap == null) { DefaultTrace.TraceInformation(string.Format("Routing Map Null for collection: {0}, PartitionKeyRangeId: {1}, forceRefresh:{2}", collectionResourceId, partitionKeyRangeId, forceRefresh)); return null; } return routingMap.TryGetRangeByPartitionKeyRangeId(partitionKeyRangeId); } public virtual async Task<CollectionRoutingMap> TryLookupAsync( string collectionRid, CollectionRoutingMap previousValue, DocumentServiceRequest request, ITrace trace) { try { return await this.routingMapCache.GetAsync( key: collectionRid, singleValueInitFunc: (_) => this.GetRoutingMapForCollectionAsync( collectionRid: collectionRid, previousRoutingMap: previousValue, trace: trace, clientSideRequestStatistics: request?.RequestContext?.ClientRequestStatistics), forceRefresh: (currentValue) => PartitionKeyRangeCache.ShouldForceRefresh(previousValue, currentValue)); } catch (DocumentClientException ex) { if (previousValue != null) { StringBuilder rangesString = new StringBuilder(); foreach (PartitionKeyRange range in previousValue.OrderedPartitionKeyRanges) { rangesString.Append(range.ToRange().ToString()); rangesString.Append(", "); } DefaultTrace.TraceInformation(string.Format("DocumentClientException in TryLookupAsync Collection: {0}, previousValue: {1} Exception: {2}", collectionRid, rangesString.ToString(), ex.Message)); } if (ex.StatusCode == HttpStatusCode.NotFound) { return null; } throw; } } private static bool ShouldForceRefresh( CollectionRoutingMap previousValue, CollectionRoutingMap currentValue) { // Previous is null then no need to force a refresh // The request didn't access the cache before if (previousValue == null) { return false; } // currentValue is null then the value just got initialized so // is not possible for it to be stale if (currentValue == null) { return false; } // CollectionRoutingMap uses changefeed to update the cache. The ChangeFeedNextIfNoneMatch // is the continuation token for the changefeed operation. If the values do not match // then another operation has already refresh the cache since this request was sent. So // there is no reason to do another refresh. return previousValue.ChangeFeedNextIfNoneMatch == currentValue.ChangeFeedNextIfNoneMatch; } private async Task<CollectionRoutingMap> GetRoutingMapForCollectionAsync( string collectionRid, CollectionRoutingMap previousRoutingMap, ITrace trace, IClientSideRequestStatistics clientSideRequestStatistics) { List<PartitionKeyRange> ranges = new List<PartitionKeyRange>(); string changeFeedNextIfNoneMatch = previousRoutingMap?.ChangeFeedNextIfNoneMatch; HttpStatusCode lastStatusCode = HttpStatusCode.OK; RetryOptions retryOptions = new RetryOptions(); MetadataRequestThrottleRetryPolicy metadataRetryPolicy = new ( endpointManager: this.endpointManager, maxRetryAttemptsOnThrottledRequests: retryOptions.MaxRetryAttemptsOnThrottledRequests, maxRetryWaitTimeInSeconds: retryOptions.MaxRetryWaitTimeInSeconds); do { INameValueCollection headers = new RequestNameValueCollection(); headers.Set(HttpConstants.HttpHeaders.PageSize, PageSizeString); headers.Set(HttpConstants.HttpHeaders.A_IM, HttpConstants.A_IMHeaderValues.IncrementalFeed); if (changeFeedNextIfNoneMatch != null) { headers.Set(HttpConstants.HttpHeaders.IfNoneMatch, changeFeedNextIfNoneMatch); } using (DocumentServiceResponse response = await BackoffRetryUtility<DocumentServiceResponse>.ExecuteAsync( () => this.ExecutePartitionKeyRangeReadChangeFeedAsync(collectionRid, headers, trace, clientSideRequestStatistics, metadataRetryPolicy), retryPolicy: metadataRetryPolicy)) { lastStatusCode = response.StatusCode; changeFeedNextIfNoneMatch = response.Headers[HttpConstants.HttpHeaders.ETag]; FeedResource<PartitionKeyRange> feedResource = response.GetResource<FeedResource<PartitionKeyRange>>(); if (feedResource != null) { ranges.AddRange(feedResource); } } } while (lastStatusCode != HttpStatusCode.NotModified); IEnumerable<Tuple<PartitionKeyRange, ServiceIdentity>> tuples = ranges.Select(range => Tuple.Create(range, (ServiceIdentity)null)); CollectionRoutingMap routingMap; if (previousRoutingMap == null) { // Splits could have happened during change feed query and we might have a mix of gone and new ranges. HashSet<string> goneRanges = new HashSet<string>(ranges.SelectMany(range => range.Parents ?? Enumerable.Empty<string>())); routingMap = CollectionRoutingMap.TryCreateCompleteRoutingMap( tuples.Where(tuple => !goneRanges.Contains(tuple.Item1.Id)), string.Empty, changeFeedNextIfNoneMatch); } else { routingMap = previousRoutingMap.TryCombine(tuples, changeFeedNextIfNoneMatch); } if (routingMap == null) { // Range information either doesn't exist or is not complete. throw new NotFoundException($"{DateTime.UtcNow.ToString("o", CultureInfo.InvariantCulture)}: GetRoutingMapForCollectionAsync(collectionRid: {collectionRid}), Range information either doesn't exist or is not complete."); } trace.AddDatum($"PKRangeCache Info({previousRoutingMap?.ChangeFeedNextIfNoneMatch}#{DateTime.UtcNow.ToString("o", CultureInfo.InvariantCulture)})", new PartitionKeyRangeCacheTraceDatum( previousContinuationToken: previousRoutingMap?.ChangeFeedNextIfNoneMatch, continuationToken: routingMap.ChangeFeedNextIfNoneMatch)); return routingMap; } private async Task<DocumentServiceResponse> ExecutePartitionKeyRangeReadChangeFeedAsync(string collectionRid, INameValueCollection headers, ITrace trace, IClientSideRequestStatistics clientSideRequestStatistics, IDocumentClientRetryPolicy retryPolicy) { using (ITrace childTrace = trace.StartChild("Read PartitionKeyRange Change Feed", TraceComponent.Transport, Tracing.TraceLevel.Info)) { using (DocumentServiceRequest request = DocumentServiceRequest.Create( OperationType.ReadFeed, collectionRid, ResourceType.PartitionKeyRange, AuthorizationTokenType.PrimaryMasterKey, headers)) { retryPolicy.OnBeforeSendRequest(request); string authorizationToken = null; try { authorizationToken = await this.authorizationTokenProvider.GetUserAuthorizationTokenAsync( request.ResourceAddress, PathsHelper.GetResourcePath(request.ResourceType), HttpConstants.HttpMethods.Get, request.Headers, AuthorizationTokenType.PrimaryMasterKey, childTrace); } catch (UnauthorizedException) { } if (authorizationToken == null) { // User doesn't have rid based resource token. Maybe he has name based. throw new NotSupportedException("Resource tokens are not supported"); ////CosmosContainerSettings collection = await this.collectionCache.ResolveCollectionAsync(request, CancellationToken.None); ////authorizationToken = //// this.authorizationTokenProvider.GetUserAuthorizationTokenAsync( //// collection.AltLink, //// PathsHelper.GetResourcePath(request.ResourceType), //// HttpConstants.HttpMethods.Get, //// request.Headers, //// AuthorizationTokenType.PrimaryMasterKey); } request.Headers[HttpConstants.HttpHeaders.Authorization] = authorizationToken; request.RequestContext.ClientRequestStatistics = clientSideRequestStatistics ?? new ClientSideRequestStatisticsTraceDatum(DateTime.UtcNow, trace); if (clientSideRequestStatistics == null) { childTrace.AddDatum("Client Side Request Stats", request.RequestContext.ClientRequestStatistics); } using (new ActivityScope(Guid.NewGuid())) { try { return await this.storeModel.ProcessMessageAsync(request); } catch (DocumentClientException ex) { childTrace.AddDatum("Exception Message", ex.Message); throw; } catch (CosmosException ce) { childTrace.AddDatum("Exception Message", ce.Message); throw; } } } } } } }