in Microsoft.Azure.Cosmos/src/Linq/BuiltinFunctions/ChangeFeedQuery.cs [23:211]
where TResource : new()
{
#region Fields
private const string IfNoneMatchAllHeaderValue = "*"; // This means start from current.
private readonly ResourceType resourceType;
private readonly DocumentClient client;
private readonly string resourceLink;
private readonly ChangeFeedOptions feedOptions;
private readonly string ifModifiedSince;
private HttpStatusCode lastStatusCode = HttpStatusCode.OK;
private string nextIfNoneMatch;
#endregion Fields
#region Constructor
public ChangeFeedQuery(DocumentClient client, ResourceType resourceType, string resourceLink, ChangeFeedOptions feedOptions)
{
Debug.Assert(client != null);
this.client = client;
this.resourceType = resourceType;
this.resourceLink = resourceLink;
this.feedOptions = feedOptions ?? new ChangeFeedOptions();
if (feedOptions.PartitionKey != null && !string.IsNullOrEmpty(feedOptions.PartitionKeyRangeId))
{
throw new ArgumentException(RMResources.PartitionKeyAndPartitionKeyRangeRangeIdBothSpecified, "feedOptions");
}
bool canUseStartFromBeginning = true;
if (feedOptions.RequestContinuation != null)
{
this.nextIfNoneMatch = feedOptions.RequestContinuation;
canUseStartFromBeginning = false;
}
if (feedOptions.StartTime.HasValue)
{
this.ifModifiedSince = this.ConvertToHttpTime(feedOptions.StartTime.Value);
canUseStartFromBeginning = false;
}
if (canUseStartFromBeginning && !feedOptions.StartFromBeginning)
{
this.nextIfNoneMatch = IfNoneMatchAllHeaderValue;
}
}
#endregion Constructor
#region IDisposable
public void Dispose()
{
}
#endregion IDisposable
#region IDocumentQuery<TResource>
/// <summary>
/// Gets a value indicating whether there are potentially additional results that can be retrieved.
/// </summary>
/// <value>Boolean value representing if whether there are potentially additional results that can be retrieved.</value>
/// <remarks>Initially returns true. This value is set based on whether the last execution returned a continuation token.</remarks>
public bool HasMoreResults => this.lastStatusCode != HttpStatusCode.NotModified;
/// <summary>
/// Read feed and retrieves the next page of results in the Azure Cosmos DB service.
/// </summary>
/// <typeparam name="TResult">The type of the object returned in the query result.</typeparam>
/// <returns>The Task object for the asynchronous response from query execution.</returns>
public Task<DocumentFeedResponse<TResult>> ExecuteNextAsync<TResult>(CancellationToken cancellationToken = default)
{
return this.ReadDocumentChangeFeedAsync<TResult>(this.resourceLink, cancellationToken);
}
/// <summary>
/// Executes the query and retrieves the next page of results as dynamic objects in the Azure Cosmos DB service.
/// </summary>
/// <param name="cancellationToken">(Optional) The <see cref="CancellationToken"/> allows for notification that operations should be cancelled.</param>
/// <returns>The Task object for the asynchronous response from query execution.</returns>
public Task<DocumentFeedResponse<dynamic>> ExecuteNextAsync(CancellationToken cancellationToken = default)
{
return this.ExecuteNextAsync<dynamic>(cancellationToken);
}
#endregion IDocumentQuery<TResource>
#region Private
public Task<DocumentFeedResponse<TResult>> ReadDocumentChangeFeedAsync<TResult>(string resourceLink, CancellationToken cancellationToken)
{
IDocumentClientRetryPolicy retryPolicy = this.client.ResetSessionTokenRetryPolicy.GetRequestPolicy();
return TaskHelper.InlineIfPossible(
() => this.ReadDocumentChangeFeedPrivateAsync<TResult>(resourceLink, retryPolicy, cancellationToken), retryPolicy, cancellationToken);
}
private async Task<DocumentFeedResponse<TResult>> ReadDocumentChangeFeedPrivateAsync<TResult>(string link, IDocumentClientRetryPolicy retryPolicyInstance, CancellationToken cancellationToken)
{
using (DocumentServiceResponse response = await this.GetFeedResponseAsync(link, this.resourceType, retryPolicyInstance, cancellationToken))
{
this.lastStatusCode = response.StatusCode;
this.nextIfNoneMatch = response.Headers[HttpConstants.HttpHeaders.ETag];
if (response.ResponseBody != null && response.ResponseBody.Length > 0)
{
long responseLengthInBytes = response.ResponseBody.Length;
IEnumerable<dynamic> feedResource = response.GetQueryResponse(typeof(TResource), out int itemCount);
DocumentFeedResponse<dynamic> feedResponse = new DocumentFeedResponse<dynamic>(
feedResource,
itemCount,
response.Headers,
true,
null,
response.RequestStats,
responseLengthBytes: responseLengthInBytes);
return (dynamic)feedResponse;
}
else
{
return new DocumentFeedResponse<TResult>(
Enumerable.Empty<TResult>(),
0,
response.Headers,
true,
null,
response.RequestStats);
}
}
}
private async Task<DocumentServiceResponse> GetFeedResponseAsync(string resourceLink, ResourceType resourceType, IDocumentClientRetryPolicy retryPolicyInstance, CancellationToken cancellationToken)
{
RequestNameValueCollection headers = new RequestNameValueCollection();
if (this.feedOptions.MaxItemCount.HasValue)
{
headers.PageSize = this.feedOptions.MaxItemCount.ToString();
}
if (this.feedOptions.SessionToken != null)
{
headers.SessionToken = this.feedOptions.SessionToken;
}
if (resourceType.IsPartitioned() && this.feedOptions.PartitionKeyRangeId == null && this.feedOptions.PartitionKey == null)
{
throw new ForbiddenException(RMResources.PartitionKeyRangeIdOrPartitionKeyMustBeSpecified);
}
// On REST level, change feed is using IfNoneMatch/ETag instead of continuation.
if (this.nextIfNoneMatch != null)
{
headers.IfNoneMatch = this.nextIfNoneMatch;
}
if (this.ifModifiedSince != null)
{
headers.IfModifiedSince = this.ifModifiedSince;
}
headers.Set(HttpConstants.HttpHeaders.A_IM, HttpConstants.A_IMHeaderValues.IncrementalFeed);
if (this.feedOptions.PartitionKey != null)
{
PartitionKeyInternal partitionKey = this.feedOptions.PartitionKey.InternalKey;
headers.PartitionKey = partitionKey.ToJsonString();
}
if (this.feedOptions.IncludeTentativeWrites)
{
headers.IncludeTentativeWrites = bool.TrueString;
}
using (DocumentServiceRequest request = this.client.CreateDocumentServiceRequest(
OperationType.ReadFeed,
resourceLink,
resourceType,
headers))
{
if (resourceType.IsPartitioned() && this.feedOptions.PartitionKeyRangeId != null)
{
request.RouteTo(new PartitionKeyRangeIdentity(this.feedOptions.PartitionKeyRangeId));
}
return await this.client.ReadFeedAsync(request, retryPolicyInstance, cancellationToken);
}
}
private string ConvertToHttpTime(DateTime time)
{
return time.ToUniversalTime().ToString("r", CultureInfo.InvariantCulture);
}
#endregion Private
}