in Microsoft.Azure.Cosmos/src/Pagination/CrossPartitionRangePageAsyncEnumerator.cs [58:197]
public async ValueTask<bool> MoveNextAsync(ITrace trace, CancellationToken cancellationToken)
{
if (trace == null)
{
throw new ArgumentNullException(nameof(trace));
}
using (ITrace childTrace = trace.StartChild(name: nameof(MoveNextAsync), component: TraceComponent.Pagination, level: TraceLevel.Info))
{
IQueue<PartitionRangePageAsyncEnumerator<TPage, TState>> enumerators = await this.lazyEnumerators.GetValueAsync(
childTrace,
cancellationToken);
if (enumerators.Count == 0)
{
this.Current = default;
this.CurrentRange = default;
this.nextState = default;
return false;
}
PartitionRangePageAsyncEnumerator<TPage, TState> currentPaginator = enumerators.Dequeue();
bool moveNextResult = false;
try
{
moveNextResult = await currentPaginator.MoveNextAsync(childTrace, cancellationToken);
}
catch
{
// Re-queue the enumerator to avoid emptying the queue
enumerators.Enqueue(currentPaginator);
throw;
}
if (!moveNextResult)
{
// Current enumerator is empty,
// so recursively retry on the next enumerator.
return await this.MoveNextAsync(childTrace, cancellationToken);
}
if (currentPaginator.Current.Failed)
{
// Check if it's a retryable exception.
Exception exception = currentPaginator.Current.Exception;
while (exception.InnerException != null)
{
exception = exception.InnerException;
}
if (IsSplitException(exception))
{
// Handle split
List<FeedRangeEpk> childRanges = await this.feedRangeProvider.GetChildRangeAsync(
currentPaginator.FeedRangeState.FeedRange,
childTrace,
cancellationToken);
if (childRanges.Count <= 1)
{
// We optimistically assumed that the cache is not stale.
// In the event that it is (where we only get back one child / the partition that we think got split)
// Then we need to refresh the cache
await this.feedRangeProvider.RefreshProviderAsync(childTrace, cancellationToken);
childRanges = await this.feedRangeProvider.GetChildRangeAsync(
currentPaginator.FeedRangeState.FeedRange,
childTrace,
cancellationToken);
}
if (childRanges.Count < 1)
{
string errorMessage = "SDK invariant violated 4795CC37: Must have at least one EPK range in a cross partition enumerator";
throw Resource.CosmosExceptions.CosmosExceptionFactory.CreateInternalServerErrorException(
message: errorMessage,
headers: null,
stackTrace: null,
trace: childTrace,
error: new Microsoft.Azure.Documents.Error { Code = "SDK_invariant_violated_4795CC37", Message = errorMessage });
}
if (childRanges.Count == 1)
{
// On a merge, the 410/1002 results in a single parent
// We maintain the current enumerator's range and let the RequestInvokerHandler logic kick in
enumerators.Enqueue(currentPaginator);
}
else
{
// Split
foreach (FeedRangeInternal childRange in childRanges)
{
PartitionRangePageAsyncEnumerator<TPage, TState> childPaginator = this.createPartitionRangeEnumerator(
new FeedRangeState<TState>(childRange, currentPaginator.FeedRangeState.State));
enumerators.Enqueue(childPaginator);
}
}
// Recursively retry
return await this.MoveNextAsync(childTrace, cancellationToken);
}
// Just enqueue the paginator and the user can decide if they want to retry.
enumerators.Enqueue(currentPaginator);
this.Current = TryCatch<CrossFeedRangePage<TPage, TState>>.FromException(currentPaginator.Current.Exception);
this.CurrentRange = currentPaginator.FeedRangeState.FeedRange;
this.nextState = CrossPartitionRangePageAsyncEnumerator<TPage, TState>.GetNextRange(enumerators);
return true;
}
if (currentPaginator.FeedRangeState.State != default)
{
// Don't enqueue the paginator otherwise it's an infinite loop.
enumerators.Enqueue(currentPaginator);
}
CrossFeedRangeState<TState> crossPartitionState;
if (enumerators.Count == 0)
{
crossPartitionState = null;
}
else
{
FeedRangeState<TState>[] feedRangeAndStates = new FeedRangeState<TState>[enumerators.Count];
int i = 0;
foreach (PartitionRangePageAsyncEnumerator<TPage, TState> enumerator in enumerators)
{
feedRangeAndStates[i++] = enumerator.FeedRangeState;
}
crossPartitionState = new CrossFeedRangeState<TState>(feedRangeAndStates);
}
this.Current = TryCatch<CrossFeedRangePage<TPage, TState>>.FromResult(
new CrossFeedRangePage<TPage, TState>(currentPaginator.Current.Result, crossPartitionState));
this.CurrentRange = currentPaginator.FeedRangeState.FeedRange;
this.nextState = CrossPartitionRangePageAsyncEnumerator<TPage, TState>.GetNextRange(enumerators);
return true;
}
}