Microsoft.Azure.Cosmos/src/Linq/DocumentQuery.cs (289 lines of code) (raw):
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------
namespace Microsoft.Azure.Cosmos.Linq
{
using System;
using System.Collections;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Query;
using Microsoft.Azure.Cosmos.Query.Core;
using Microsoft.Azure.Cosmos.Query.Core.Metrics;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Collections;
using Newtonsoft.Json;
internal sealed class DocumentQuery<T> : IDocumentQuery<T>, IOrderedQueryable<T>
{
public static readonly DocumentFeedResponse<dynamic> EmptyFeedResponse = new DocumentFeedResponse<dynamic>(Enumerable.Empty<dynamic>(), 0, new StoreResponseNameValueCollection());
private readonly IDocumentQueryClient client;
private readonly ResourceType resourceTypeEnum;
private readonly Type resourceType;
private readonly string documentsFeedOrDatabaseLink;
private readonly FeedOptions feedOptions;
private readonly object partitionKey;
private readonly DocumentQueryProvider queryProvider;
private readonly SchedulingStopwatch executeNextAysncMetrics;
private IDocumentQueryExecutionContext queryExecutionContext;
private bool tracedFirstExecution;
private bool tracedLastExecution;
// Root Query.
public DocumentQuery(
IDocumentQueryClient client,
ResourceType resourceTypeEnum,
Type resourceType,
string documentsFeedOrDatabaseLink,
Expression expression,
FeedOptions feedOptions,
object partitionKey = null)
{
if (client == null)
{
throw new ArgumentNullException("client");
}
this.client = client;
this.resourceTypeEnum = resourceTypeEnum;
this.resourceType = resourceType;
this.documentsFeedOrDatabaseLink = documentsFeedOrDatabaseLink;
this.feedOptions = feedOptions == null ? new FeedOptions() : new FeedOptions(feedOptions);
// Swapping out negative values in feedOptions for int.MaxValue
if (this.feedOptions.MaxBufferedItemCount < 0)
{
this.feedOptions.MaxBufferedItemCount = int.MaxValue;
}
if (this.feedOptions.MaxDegreeOfParallelism < 0)
{
this.feedOptions.MaxDegreeOfParallelism = int.MaxValue;
}
if (this.feedOptions.MaxItemCount < 0)
{
this.feedOptions.MaxItemCount = int.MaxValue;
}
this.partitionKey = partitionKey;
this.Expression = expression ?? Expression.Constant(this);
this.queryProvider = new DocumentQueryProvider(
client,
resourceTypeEnum,
resourceType,
documentsFeedOrDatabaseLink,
feedOptions,
partitionKey,
this.client.OnExecuteScalarQueryCallback);
this.executeNextAysncMetrics = new SchedulingStopwatch();
this.executeNextAysncMetrics.Ready();
this.CorrelatedActivityId = Guid.NewGuid();
}
public DocumentQuery(
DocumentClient client,
ResourceType resourceTypeEnum,
Type resourceType,
string documentsFeedOrDatabaseLink,
Expression expression,
FeedOptions feedOptions,
object partitionKey = null)
: this(
new DocumentQueryClient(client),
resourceTypeEnum,
resourceType,
documentsFeedOrDatabaseLink,
expression,
feedOptions,
partitionKey)
{
}
public DocumentQuery(
IDocumentQueryClient client,
ResourceType resourceTypeEnum,
Type resourceType,
string documentsFeedOrDatabaseLink,
FeedOptions feedOptions,
object partitionKey = null)
: this(
client,
resourceTypeEnum,
resourceType,
documentsFeedOrDatabaseLink,
null,
feedOptions,
partitionKey)
{
}
public DocumentQuery(
DocumentClient client,
ResourceType resourceTypeEnum,
Type resourceType,
string documentsFeedOrDatabaseLink,
FeedOptions feedOptions,
object partitionKey = null)
: this(
new DocumentQueryClient(client),
resourceTypeEnum,
resourceType,
documentsFeedOrDatabaseLink,
null,
feedOptions,
partitionKey)
{
}
public Type ElementType
{
get { return typeof(T); }
}
public Expression Expression { get; }
public IQueryProvider Provider
{
get { return this.queryProvider; }
}
/// <summary>
/// Gets a value indicating whether there are additional results to retrieve.
/// </summary>
public bool HasMoreResults
{
get
{
return this.queryExecutionContext == null || !this.queryExecutionContext.IsDone;
}
}
/// <summary>
/// Gets the unique ID for this instance of DocumentQuery used to correlate all activityIds generated when fetching from a partition collection.
/// </summary>
public Guid CorrelatedActivityId { get; }
public void Dispose()
{
if (this.queryExecutionContext != null)
{
this.queryExecutionContext.Dispose();
DefaultTrace.TraceInformation(
string.Format(
CultureInfo.InvariantCulture,
"{0}, CorrelatedActivityId: {1} | Disposing DocumentQuery",
DateTime.UtcNow.ToString("o", CultureInfo.InvariantCulture),
this.CorrelatedActivityId));
}
}
/// <summary>
/// Executes the query to retrieve the next page of results.
/// </summary>
public Task<DocumentFeedResponse<dynamic>> ExecuteNextAsync(CancellationToken cancellationToken = default)
{
return this.ExecuteNextAsync<dynamic>(cancellationToken);
}
/// <summary>
/// Executes the query to retrieve the next page of results.
/// </summary>
public Task<DocumentFeedResponse<TResponse>> ExecuteNextAsync<TResponse>(CancellationToken cancellationToken = default)
{
try
{
if (!this.tracedFirstExecution)
{
DefaultTrace.TraceInformation(string.Format(
CultureInfo.InvariantCulture,
"{0}, CorrelatedActivityId: {1} | First ExecuteNextAsync",
DateTime.UtcNow.ToString("o", CultureInfo.InvariantCulture),
this.CorrelatedActivityId));
this.tracedFirstExecution = true;
}
this.executeNextAysncMetrics.Start();
return TaskHelper.InlineIfPossible(() => this.ExecuteNextPrivateAsync<TResponse>(cancellationToken), null, cancellationToken);
}
finally
{
this.executeNextAysncMetrics.Stop();
if (!this.HasMoreResults && !this.tracedLastExecution)
{
DefaultTrace.TraceInformation(
string.Format(
CultureInfo.InvariantCulture,
"{0}, CorrelatedActivityId: {1} | Last ExecuteNextAsync with ExecuteNextAsyncMetrics: [{2}]",
DateTime.UtcNow.ToString("o", CultureInfo.InvariantCulture),
this.CorrelatedActivityId,
this.executeNextAysncMetrics));
this.tracedLastExecution = true;
}
}
}
/// <summary>
/// Retrieves an object that can iterate through the individual results of the query.
/// </summary>
/// <remarks>
/// This triggers a synchronous multi-page load.
/// </remarks>
public IEnumerator<T> GetEnumerator()
{
using (IDocumentQueryExecutionContext localQueryExecutionContext =
#pragma warning disable VSTHRD002 // Avoid problematic synchronous waits
TaskHelper.InlineIfPossible(() => this.CreateDocumentQueryExecutionContextAsync(false, CancellationToken.None), null).Result)
#pragma warning restore VSTHRD002 // Avoid problematic synchronous waits
{
while (!localQueryExecutionContext.IsDone)
{
#pragma warning disable VSTHRD002 // Avoid problematic synchronous waits
DocumentFeedResponse<CosmosElement> feedResponse = TaskHelper.InlineIfPossible(() => localQueryExecutionContext.ExecuteNextFeedResponseAsync(CancellationToken.None), null).Result;
#pragma warning restore VSTHRD002 // Avoid problematic synchronous waits
DocumentFeedResponse<T> typedFeedResponse = FeedResponseBinder.ConvertCosmosElementFeed<T>(
feedResponse,
this.resourceTypeEnum,
this.feedOptions.JsonSerializerSettings);
foreach (T item in typedFeedResponse)
{
yield return item;
}
}
}
}
/// <summary>
/// Synchronous Multi-Page load
/// </summary>
IEnumerator IEnumerable.GetEnumerator()
{
return this.GetEnumerator();
}
public override string ToString()
{
LinqQueryOperation linqQueryOperation = DocumentQueryEvaluator.Evaluate(this.Expression);
if (linqQueryOperation.SqlQuerySpec != null)
{
return JsonConvert.SerializeObject(linqQueryOperation.SqlQuerySpec);
}
return new Uri(this.client.ServiceEndpoint, this.documentsFeedOrDatabaseLink).ToString();
}
private Task<IDocumentQueryExecutionContext> CreateDocumentQueryExecutionContextAsync(bool isContinuationExpected, CancellationToken cancellationToken)
{
return DocumentQueryExecutionContextFactory.CreateDocumentQueryExecutionContextAsync(
this.client,
this.resourceTypeEnum,
this.resourceType,
this.Expression,
this.feedOptions,
this.documentsFeedOrDatabaseLink,
isContinuationExpected,
cancellationToken,
this.CorrelatedActivityId);
}
internal async Task<List<T>> ExecuteAllAsync(CancellationToken cancellationToken = default)
{
List<T> result = new List<T>();
using (IDocumentQueryExecutionContext localQueryExecutionContext =
await TaskHelper.InlineIfPossible(() => this.CreateDocumentQueryExecutionContextAsync(false, cancellationToken), null, cancellationToken))
{
while (!localQueryExecutionContext.IsDone)
{
DocumentFeedResponse<T> partialResult = await (dynamic)TaskHelper.InlineIfPossible(() => localQueryExecutionContext.ExecuteNextFeedResponseAsync(cancellationToken), null, cancellationToken);
result.AddRange(partialResult);
}
}
return result;
}
private async Task<DocumentFeedResponse<TResponse>> ExecuteNextPrivateAsync<TResponse>(CancellationToken cancellationToken)
{
if (this.queryExecutionContext == null)
{
this.queryExecutionContext = await this.CreateDocumentQueryExecutionContextAsync(true, cancellationToken);
}
else if (this.queryExecutionContext.IsDone)
{
this.queryExecutionContext.Dispose();
this.queryExecutionContext = await this.CreateDocumentQueryExecutionContextAsync(true, cancellationToken);
}
DocumentFeedResponse<CosmosElement> response = await this.queryExecutionContext.ExecuteNextFeedResponseAsync(cancellationToken);
DocumentFeedResponse<TResponse> typedFeedResponse = FeedResponseBinder.ConvertCosmosElementFeed<TResponse>(
response,
this.resourceTypeEnum,
this.feedOptions.JsonSerializerSettings);
if (!this.HasMoreResults && !this.tracedLastExecution)
{
DefaultTrace.TraceInformation(
string.Format(
CultureInfo.InvariantCulture,
"{0}, CorrelatedActivityId: {1} | Last ExecuteNextAsync with ExecuteNextAsyncMetrics: [{2}]",
DateTime.UtcNow.ToString("o", CultureInfo.InvariantCulture),
this.CorrelatedActivityId,
this.executeNextAysncMetrics));
this.tracedLastExecution = true;
}
return typedFeedResponse;
}
}
}