Microsoft.Azure.Cosmos/src/Query/v3Query/CosmosQueryClientCore.cs (474 lines of code) (raw):

//------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //------------------------------------------------------------ namespace Microsoft.Azure.Cosmos { using System; using System.Collections.Generic; using System.Diagnostics; using System.Globalization; using System.IO; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.Common; using Microsoft.Azure.Cosmos.CosmosElements; using Microsoft.Azure.Cosmos.Json; using Microsoft.Azure.Cosmos.Query.Core; using Microsoft.Azure.Cosmos.Query.Core.Metrics; using Microsoft.Azure.Cosmos.Query.Core.Monads; using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination; using Microsoft.Azure.Cosmos.Query.Core.QueryClient; using Microsoft.Azure.Cosmos.Query.Core.QueryPlan; using Microsoft.Azure.Cosmos.Routing; using Microsoft.Azure.Cosmos.Tracing; using Microsoft.Azure.Cosmos.Tracing.TraceData; using Microsoft.Azure.Documents; using Microsoft.Azure.Documents.Routing; using Newtonsoft.Json; using static Microsoft.Azure.Documents.RuntimeConstants; internal class CosmosQueryClientCore : CosmosQueryClient { private const string QueryExecutionInfoHeader = "x-ms-cosmos-query-execution-info"; private readonly CosmosClientContext clientContext; private readonly ContainerInternal cosmosContainerCore; private readonly DocumentClient documentClient; private readonly SemaphoreSlim semaphore; public CosmosQueryClientCore( CosmosClientContext clientContext, ContainerInternal cosmosContainerCore) { this.clientContext = clientContext ?? throw new ArgumentException(nameof(clientContext)); this.cosmosContainerCore = cosmosContainerCore; this.documentClient = this.clientContext.DocumentClient; this.semaphore = new SemaphoreSlim(1, 1); } public override Action<IQueryable> OnExecuteScalarQueryCallback => this.documentClient.OnExecuteScalarQueryCallback; public override async Task<ContainerQueryProperties> GetCachedContainerQueryPropertiesAsync( string containerLink, PartitionKey? partitionKey, ITrace trace, CancellationToken cancellationToken) { ContainerProperties containerProperties = await this.clientContext.GetCachedContainerPropertiesAsync( containerLink, trace, cancellationToken); List<Range<string>> effectivePartitionKeyRange = null; if (partitionKey != null) { // Dis-ambiguate the NonePK if used PartitionKeyInternal partitionKeyInternal = partitionKey.Value.IsNone ? containerProperties.GetNoneValue() : partitionKey.Value.InternalKey; effectivePartitionKeyRange = new List<Range<string>> { PartitionKeyInternal.GetEffectivePartitionKeyRange( containerProperties.PartitionKey, new Range<PartitionKeyInternal>( min: partitionKeyInternal, max: partitionKeyInternal, isMinInclusive: true, isMaxInclusive: true)) }; } return new ContainerQueryProperties( containerProperties.ResourceId, effectivePartitionKeyRange, containerProperties.PartitionKey, containerProperties.VectorEmbeddingPolicy, containerProperties.GeospatialConfig.GeospatialType); } public override async Task<TryCatch<PartitionedQueryExecutionInfo>> TryGetPartitionedQueryExecutionInfoAsync( SqlQuerySpec sqlQuerySpec, ResourceType resourceType, PartitionKeyDefinition partitionKeyDefinition, VectorEmbeddingPolicy vectorEmbeddingPolicy, bool requireFormattableOrderByQuery, bool isContinuationExpected, bool allowNonValueAggregateQuery, bool hasLogicalPartitionKey, bool allowDCount, bool useSystemPrefix, bool isHybridSearchQueryPlanOptimizationDisabled, Cosmos.GeospatialType geospatialType, CancellationToken cancellationToken) { string queryString = null; if (sqlQuerySpec != null) { using (Stream stream = this.clientContext.SerializerCore.ToStreamSqlQuerySpec(sqlQuerySpec, resourceType)) { using (StreamReader reader = new StreamReader(stream)) { queryString = reader.ReadToEnd(); } } } return (await this.documentClient.QueryPartitionProvider).TryGetPartitionedQueryExecutionInfo( querySpecJsonString: queryString, partitionKeyDefinition: partitionKeyDefinition, vectorEmbeddingPolicy: vectorEmbeddingPolicy, requireFormattableOrderByQuery: requireFormattableOrderByQuery, isContinuationExpected: isContinuationExpected, allowNonValueAggregateQuery: allowNonValueAggregateQuery, hasLogicalPartitionKey: hasLogicalPartitionKey, allowDCount: allowDCount, useSystemPrefix: useSystemPrefix, hybridSearchSkipOrderByRewrite: !isHybridSearchQueryPlanOptimizationDisabled, geospatialType: geospatialType); } public override async Task<TryCatch<QueryPage>> ExecuteItemQueryAsync( string resourceUri, ResourceType resourceType, OperationType operationType, FeedRange feedRange, QueryRequestOptions requestOptions, AdditionalRequestHeaders additionalRequestHeaders, SqlQuerySpec sqlQuerySpec, string continuationToken, int pageSize, ITrace trace, CancellationToken cancellationToken) { requestOptions.MaxItemCount = pageSize; ResponseMessage message = await this.clientContext.ProcessResourceOperationStreamAsync( resourceUri: resourceUri, resourceType: resourceType, operationType: operationType, requestOptions: requestOptions, feedRange: feedRange, cosmosContainerCore: this.cosmosContainerCore, streamPayload: this.clientContext.SerializerCore.ToStreamSqlQuerySpec(sqlQuerySpec, resourceType), requestEnricher: (cosmosRequestMessage) => { cosmosRequestMessage.Headers.Add( HttpConstants.HttpHeaders.IsContinuationExpected, additionalRequestHeaders.IsContinuationExpected.ToString()); QueryRequestOptions.FillContinuationToken( cosmosRequestMessage, continuationToken); cosmosRequestMessage.Headers.Add(HttpConstants.HttpHeaders.ContentType, MediaTypes.QueryJson); cosmosRequestMessage.Headers.Add(HttpConstants.HttpHeaders.IsQuery, bool.TrueString); cosmosRequestMessage.Headers.Add(WFConstants.BackendHeaders.CorrelatedActivityId, additionalRequestHeaders.CorrelatedActivityId.ToString()); cosmosRequestMessage.Headers.Add(HttpConstants.HttpHeaders.OptimisticDirectExecute, additionalRequestHeaders.OptimisticDirectExecute.ToString()); }, trace: trace, cancellationToken: cancellationToken); return CosmosQueryClientCore.GetCosmosElementResponse( resourceType, message, trace); } public override async Task<PartitionedQueryExecutionInfo> ExecuteQueryPlanRequestAsync( string resourceUri, ResourceType resourceType, OperationType operationType, SqlQuerySpec sqlQuerySpec, PartitionKey? partitionKey, string supportedQueryFeatures, Guid clientQueryCorrelationId, ITrace trace, CancellationToken cancellationToken) { PartitionedQueryExecutionInfo partitionedQueryExecutionInfo; using (ResponseMessage message = await this.clientContext.ProcessResourceOperationStreamAsync( resourceUri: resourceUri, resourceType: resourceType, operationType: operationType, requestOptions: null, feedRange: partitionKey.HasValue ? new FeedRangePartitionKey(partitionKey.Value) : null, cosmosContainerCore: this.cosmosContainerCore, streamPayload: this.clientContext.SerializerCore.ToStreamSqlQuerySpec(sqlQuerySpec, resourceType), requestEnricher: (requestMessage) => { requestMessage.Headers.Add(HttpConstants.HttpHeaders.ContentType, RuntimeConstants.MediaTypes.QueryJson); requestMessage.Headers.Add(HttpConstants.HttpHeaders.IsQueryPlanRequest, bool.TrueString); requestMessage.Headers.Add(HttpConstants.HttpHeaders.SupportedQueryFeatures, supportedQueryFeatures); requestMessage.Headers.Add(HttpConstants.HttpHeaders.QueryVersion, new Version(major: 1, minor: 0).ToString()); requestMessage.Headers.Add(WFConstants.BackendHeaders.CorrelatedActivityId, clientQueryCorrelationId.ToString()); requestMessage.UseGatewayMode = true; }, trace: trace, cancellationToken: cancellationToken)) { // Syntax exception are argument exceptions and thrown to the user. message.EnsureSuccessStatusCode(); partitionedQueryExecutionInfo = this.clientContext.SerializerCore.FromStream<PartitionedQueryExecutionInfo>(message.Content); } return partitionedQueryExecutionInfo; } public override async Task<bool> GetClientDisableOptimisticDirectExecutionAsync() { QueryPartitionProvider provider = await this.clientContext.DocumentClient.QueryPartitionProvider; return provider.ClientDisableOptimisticDirectExecution; } public override async Task<List<PartitionKeyRange>> GetTargetPartitionKeyRangeByFeedRangeAsync( string resourceLink, string collectionResourceId, PartitionKeyDefinition partitionKeyDefinition, FeedRangeInternal feedRangeInternal, bool forceRefresh, ITrace trace) { using (ITrace childTrace = trace.StartChild("Get Overlapping Feed Ranges", TraceComponent.Routing, Tracing.TraceLevel.Info)) { IRoutingMapProvider routingMapProvider = await this.GetRoutingMapProviderAsync(); List<Range<string>> ranges = await feedRangeInternal.GetEffectiveRangesAsync(routingMapProvider, collectionResourceId, partitionKeyDefinition, trace); return await this.GetTargetPartitionKeyRangesAsync( resourceLink, collectionResourceId, ranges, forceRefresh, childTrace); } } public override async Task<List<PartitionKeyRange>> GetTargetPartitionKeyRangesAsync( string resourceLink, string collectionResourceId, IReadOnlyList<Range<string>> providedRanges, bool forceRefresh, ITrace trace) { if (string.IsNullOrEmpty(collectionResourceId)) { throw new ArgumentNullException(nameof(collectionResourceId)); } if (providedRanges == null || !providedRanges.Any() || providedRanges.Any(x => x == null)) { throw new ArgumentNullException(nameof(providedRanges)); } using (ITrace getPKRangesTrace = trace.StartChild("Get Partition Key Ranges", TraceComponent.Routing, Tracing.TraceLevel.Info)) { IRoutingMapProvider routingMapProvider = await this.GetRoutingMapProviderAsync(); List<PartitionKeyRange> ranges = await routingMapProvider.TryGetOverlappingRangesAsync(collectionResourceId, providedRanges, getPKRangesTrace); if (ranges == null && PathsHelper.IsNameBased(resourceLink)) { // Refresh the cache and don't try to re-resolve collection as it is not clear what already // happened based on previously resolved collection rid. // Return NotFoundException this time. Next query will succeed. // This can only happen if collection is deleted/created with same name and client was not restarted // in between. CollectionCache collectionCache = await this.documentClient.GetCollectionCacheAsync(getPKRangesTrace); collectionCache.Refresh(resourceLink); } if (ranges == null) { throw new NotFoundException($"{DateTime.UtcNow.ToString("o", CultureInfo.InvariantCulture)}: GetTargetPartitionKeyRanges(collectionResourceId:{collectionResourceId}, providedRanges: {string.Join(",", providedRanges)} failed due to stale cache"); } return ranges; } } public override bool BypassQueryParsing() { return CustomTypeExtensions.ByPassQueryParsing(); } public override void ClearSessionTokenCache(string collectionFullName) { ISessionContainer sessionContainer = this.clientContext.DocumentClient.sessionContainer; sessionContainer.ClearTokenByCollectionFullname(collectionFullName); } private static TryCatch<QueryPage> GetCosmosElementResponse( ResourceType resourceType, ResponseMessage cosmosResponseMessage, ITrace trace) { using (ITrace getCosmosElementResponse = trace.StartChild("Get Cosmos Element Response", TraceComponent.Json, Tracing.TraceLevel.Info)) { using (cosmosResponseMessage) { if (cosmosResponseMessage.Headers.QueryMetricsText != null) { QueryMetricsTraceDatum datum = new QueryMetricsTraceDatum( new Lazy<QueryMetrics>(() => new QueryMetrics( cosmosResponseMessage.Headers.QueryMetricsText, IndexUtilizationInfo.Empty, ClientSideMetrics.Empty))); trace.AddDatum("Query Metrics", datum); } if (!cosmosResponseMessage.IsSuccessStatusCode) { CosmosException exception = cosmosResponseMessage.CosmosException ?? new CosmosException( cosmosResponseMessage.ErrorMessage, cosmosResponseMessage.StatusCode, (int)cosmosResponseMessage.Headers.SubStatusCode, cosmosResponseMessage.Headers.ActivityId, cosmosResponseMessage.Headers.RequestCharge); return TryCatch<QueryPage>.FromException(exception); } return CreateQueryPage( cosmosResponseMessage.Headers, cosmosResponseMessage.Content, resourceType); } } } internal static TryCatch<QueryPage> CreateQueryPage( Headers headers, Stream content, ResourceType resourceType) { if (!(content is MemoryStream memoryStream)) { memoryStream = new MemoryStream(); content.CopyTo(memoryStream); } CosmosQueryClientCore.ParseRestStream( memoryStream, resourceType, out CosmosArray documents, out CosmosObject distributionPlan, out bool? streaming); DistributionPlanSpec distributionPlanSpec = null; // ISSUE-TODO-adityasa-2024/1/31 - Uncomment this when distributionPlanSpec is hooked with rest of the code so that it can be tested. // if (distributionPlan != null) // { // bool backendPlan = distributionPlan.TryGetValue("backendDistributionPlan", out CosmosElement backendDistributionPlan); // bool clientPlan = distributionPlan.TryGetValue("clientDistributionPlan", out CosmosElement clientDistributionPlan); // Debug.Assert(clientPlan == backendPlan, "Response Body Contract was violated. Out of the backend and client plans, only one is present in the distribution plan."); // if (backendPlan && clientPlan) // { // distributionPlanSpec = new DistributionPlanSpec(backendDistributionPlan.ToString(), clientDistributionPlan.ToString()); // } // } QueryState queryState; if (headers.ContinuationToken != null) { queryState = new QueryState(CosmosString.Create(headers.ContinuationToken)); } else { queryState = default; } Dictionary<string, string> additionalHeaders = new Dictionary<string, string>(); foreach (string key in headers) { if (!QueryPage.BannedHeaders.Contains(key)) { additionalHeaders[key] = headers[key]; } } Lazy<CosmosQueryExecutionInfo> cosmosQueryExecutionInfo = default; if (headers.TryGetValue(QueryExecutionInfoHeader, out string queryExecutionInfoString)) { cosmosQueryExecutionInfo = new Lazy<CosmosQueryExecutionInfo>( () => JsonConvert.DeserializeObject<CosmosQueryExecutionInfo>(queryExecutionInfoString)); } QueryPage response = new QueryPage( documents, headers.RequestCharge, headers.ActivityId, cosmosQueryExecutionInfo, distributionPlanSpec, disallowContinuationTokenMessage: null, additionalHeaders, queryState, streaming); return TryCatch<QueryPage>.FromResult(response); } private void PopulatePartitionKeyRangeInfo( RequestMessage request, PartitionKeyRangeIdentity partitionKeyRangeIdentity) { if (request == null) { throw new ArgumentNullException(nameof(request)); } if (request.ResourceType.IsPartitioned()) { // If the request already has the logical partition key, // then we shouldn't add the physical partition key range id. bool hasPartitionKey = request.Headers.PartitionKey != null; if (!hasPartitionKey) { request .ToDocumentServiceRequest() .RouteTo(partitionKeyRangeIdentity); } } } public override async Task ForceRefreshCollectionCacheAsync(string collectionLink, CancellationToken cancellationToken) { this.ClearSessionTokenCache(collectionLink); CollectionCache collectionCache = await this.documentClient.GetCollectionCacheAsync(NoOpTrace.Singleton); using (Documents.DocumentServiceRequest request = Documents.DocumentServiceRequest.Create( Documents.OperationType.Query, Documents.ResourceType.Collection, collectionLink, Documents.AuthorizationTokenType.Invalid)) //this request doesn't actually go to server { request.ForceNameCacheRefresh = true; await collectionCache.ResolveCollectionAsync(request, cancellationToken, NoOpTrace.Singleton); } } public override async Task<IReadOnlyList<PartitionKeyRange>> TryGetOverlappingRangesAsync( string collectionResourceId, Range<string> range, bool forceRefresh = false) { PartitionKeyRangeCache partitionKeyRangeCache = await this.GetRoutingMapProviderAsync(); return await partitionKeyRangeCache.TryGetOverlappingRangesAsync( collectionResourceId, range, NoOpTrace.Singleton, forceRefresh); } private Task<PartitionKeyRangeCache> GetRoutingMapProviderAsync() { return this.documentClient.GetPartitionKeyRangeCacheAsync(NoOpTrace.Singleton); } /// <summary> /// Converts a list of CosmosElements into a memory stream. /// </summary> /// <param name="stream">The memory stream response for the query REST response Azure Cosmos</param> /// <param name="resourceType">The resource type</param> /// <param name="documents">An array of CosmosElements parsed from the response body</param> /// <param name="distributionPlan">An object containing the distribution plan for the client</param> /// <param name="streaming">An optional return value indicating if the backend response is streaming</param> public static void ParseRestStream( Stream stream, ResourceType resourceType, out CosmosArray documents, out CosmosObject distributionPlan, out bool? streaming) { if (!(stream is MemoryStream memoryStream)) { memoryStream = new MemoryStream(); stream.CopyTo(memoryStream); } if (!memoryStream.CanRead) { throw new InvalidDataException("Stream can not be read"); } // Parse out the document from the REST response like this: // { // "_rid": "qHVdAImeKAQ=", // "Documents": [{ // "id": "03230", // "_rid": "qHVdAImeKAQBAAAAAAAAAA==", // "_self": "dbs\/qHVdAA==\/colls\/qHVdAImeKAQ=\/docs\/qHVdAImeKAQBAAAAAAAAAA==\/", // "_etag": "\"410000b0-0000-0000-0000-597916b00000\"", // "_attachments": "attachments\/", // "_ts": 1501107886 // }], // "_count": 1, // "_distributionPlan": { // "backendDistributionPlan": { // "query": "\nSELECT Count(r.a) AS count_a\nFROM r", // "obfuscatedQuery": "{\"query\":\"SELECT Count(r.a) AS p1\\nFROM r\",\"parameters\":[]}", // "shape": "{\"Select\":{\"Type\":\"List\",\"AggCount\":1},\"From\":{\"Expr\":\"Aliased\"}}", // "signature":-4885972563975185329, // "shapeSignature":-6171928203673877984, // "queryIL": {...}, // "noSpatial": true, // "language": "QueryIL" // }, // "coordinatorDistributionPlan": { // "clientQL": { // "Kind": "Input", // "Name": "root" // } // } // }, // "_streaming": true // } // You want to create a CosmosElement for each document in "Documents". ReadOnlyMemory<byte> content = memoryStream.TryGetBuffer(out ArraySegment<byte> buffer) ? buffer : (ReadOnlyMemory<byte>)memoryStream.ToArray(); IJsonNavigator jsonNavigator = JsonNavigator.Create(content); string resourceName = resourceType switch { ResourceType.Collection => "DocumentCollections", _ => resourceType.ToResourceTypeString() + "s", }; if (!jsonNavigator.TryGetObjectProperty( jsonNavigator.GetRootNode(), resourceName, out ObjectProperty objectProperty)) { throw new InvalidOperationException($"Response Body Contract was violated. QueryResponse did not have property: {resourceName}"); } if (!(CosmosElement.Dispatch( jsonNavigator, objectProperty.ValueNode) is CosmosArray cosmosArray)) { throw new InvalidOperationException($"QueryResponse did not have an array of : {resourceName}"); } documents = cosmosArray; if (resourceType == ResourceType.Document && jsonNavigator.TryGetObjectProperty(jsonNavigator.GetRootNode(), "_distributionPlan", out ObjectProperty distributionPlanObjectProperty)) { switch (CosmosElement.Dispatch(jsonNavigator, distributionPlanObjectProperty.ValueNode)) { case CosmosString binaryDistributionPlan: byte[] binaryJson = Convert.FromBase64String(binaryDistributionPlan.Value); IJsonNavigator binaryJsonNavigator = JsonNavigator.Create(binaryJson); IJsonNavigatorNode binaryJsonNavigatorNode = binaryJsonNavigator.GetRootNode(); distributionPlan = CosmosObject.Create(binaryJsonNavigator, binaryJsonNavigatorNode); break; case CosmosObject textDistributionPlan: distributionPlan = textDistributionPlan; break; default: throw new InvalidOperationException($"Response Body Contract was violated. QueryResponse did not have property: {resourceName}"); } } else { distributionPlan = null; } if (resourceType == ResourceType.Document && jsonNavigator.TryGetObjectProperty(jsonNavigator.GetRootNode(), "_streaming", out ObjectProperty streamingProperty)) { JsonNodeType jsonNodeType = jsonNavigator.GetNodeType(streamingProperty.ValueNode); streaming = jsonNodeType switch { JsonNodeType.False => false, JsonNodeType.True => true, _ => throw new InvalidOperationException($"Response Body Contract was violated. QueryResponse had _streaming property as a non boolean: {jsonNodeType}"), }; } else { streaming = null; } } } }