in Microsoft.Azure.Cosmos/src/ReadFeed/ReadFeedIteratorCore.cs [214:364]
public override async Task<ResponseMessage> ReadNextAsync(
ITrace trace,
CancellationToken cancellationToken = default)
{
if (trace == null)
{
throw new ArgumentNullException(nameof(trace));
}
if (!this.hasMoreResults)
{
throw new InvalidOperationException("Should not be calling FeedIterator that does not have any more results");
}
if (this.monadicEnumerator.Failed)
{
this.hasMoreResults = false;
if (!ExceptionToCosmosException.TryCreateFromException(this.monadicEnumerator.Exception, trace, out CosmosException cosmosException))
{
throw this.monadicEnumerator.Exception;
}
return new ResponseMessage(
statusCode: System.Net.HttpStatusCode.BadRequest,
requestMessage: null,
headers: cosmosException.Headers,
cosmosException: cosmosException,
trace: trace);
}
CrossPartitionReadFeedAsyncEnumerator enumerator = this.monadicEnumerator.Result;
TryCatch<CrossFeedRangePage<Pagination.ReadFeedPage, ReadFeedState>> monadicPage;
try
{
if (!await enumerator.MoveNextAsync(trace, cancellationToken))
{
throw new InvalidOperationException("Should not be calling enumerator that does not have any more results");
}
monadicPage = enumerator.Current;
}
catch (OperationCanceledException ex) when (!(ex is CosmosOperationCanceledException))
{
throw new CosmosOperationCanceledException(ex, trace);
}
if (monadicPage.Failed)
{
if (!ExceptionToCosmosException.TryCreateFromException(monadicPage.Exception, trace, out CosmosException cosmosException))
{
throw monadicPage.Exception;
}
if (!IsRetriableException(cosmosException))
{
this.hasMoreResults = false;
}
return new ResponseMessage(
statusCode: cosmosException.StatusCode,
requestMessage: null,
headers: cosmosException.Headers,
cosmosException: cosmosException,
trace: trace);
}
CrossFeedRangePage<Pagination.ReadFeedPage, ReadFeedState> crossFeedRangePage = monadicPage.Result;
if (crossFeedRangePage.State == default)
{
this.hasMoreResults = false;
}
// Make the continuation token match the older format:
string continuationToken;
if (crossFeedRangePage.State != null)
{
List<CompositeContinuationToken> compositeContinuationTokens = new List<CompositeContinuationToken>();
CrossFeedRangeState<ReadFeedState> crossFeedRangeState = crossFeedRangePage.State;
for (int i = 0; i < crossFeedRangeState.Value.Length; i++)
{
FeedRangeState<ReadFeedState> feedRangeState = crossFeedRangeState.Value.Span[i];
FeedRangeEpk feedRange;
if (feedRangeState.FeedRange is FeedRangeEpk feedRangeEpk)
{
feedRange = feedRangeEpk;
}
else
{
feedRange = FeedRangeEpk.FullRange;
}
ReadFeedState readFeedState = feedRangeState.State;
CompositeContinuationToken compositeContinuationToken = new CompositeContinuationToken()
{
Range = feedRange.Range,
Token = readFeedState is ReadFeedBeginningState ? null : ((ReadFeedContinuationState)readFeedState).ContinuationToken.ToString(),
};
compositeContinuationTokens.Add(compositeContinuationToken);
}
FeedRangeInternal outerFeedRange;
if ((this.queryRequestOptions != null) && this.queryRequestOptions.PartitionKey.HasValue)
{
outerFeedRange = new FeedRangePartitionKey(this.queryRequestOptions.PartitionKey.Value);
}
else if ((this.queryRequestOptions != null) && (this.queryRequestOptions.FeedRange != null))
{
outerFeedRange = (FeedRangeInternal)this.queryRequestOptions.FeedRange;
}
else
{
outerFeedRange = FeedRangeEpk.FullRange;
}
FeedRangeCompositeContinuation feedRangeCompositeContinuation = new FeedRangeCompositeContinuation(
containerRid: string.Empty,
feedRange: outerFeedRange,
compositeContinuationTokens);
continuationToken = feedRangeCompositeContinuation.ToString();
}
else
{
continuationToken = null;
}
Pagination.ReadFeedPage page = crossFeedRangePage.Page;
Headers headers = new Headers()
{
RequestCharge = page.RequestCharge,
ActivityId = page.ActivityId,
ContinuationToken = continuationToken,
};
foreach (KeyValuePair<string, string> kvp in page.AdditionalHeaders)
{
headers[kvp.Key] = kvp.Value;
}
return new ResponseMessage(
statusCode: System.Net.HttpStatusCode.OK,
requestMessage: default,
headers: headers,
cosmosException: default,
trace: trace)
{
Content = page.Content,
};
}