in Microsoft.Azure.Cosmos/src/ReadFeed/ReadFeedIteratorCore.cs [33:200]
public ReadFeedIteratorCore(
IDocumentContainer documentContainer,
string continuationToken,
ReadFeedExecutionOptions readFeedPaginationOptions,
QueryRequestOptions queryRequestOptions,
ContainerInternal container,
CancellationToken cancellationToken)
{
this.container = container;
this.queryRequestOptions = queryRequestOptions;
this.SetupInfoForTelemetry(
databaseName: container?.Database?.Id,
operationName: OpenTelemetryConstants.Operations.ReadFeedRanges,
operationType: OperationType.ReadFeed,
querySpec: null,
operationMetricsOptions: queryRequestOptions?.OperationMetricsOptions,
networkMetricOptions: queryRequestOptions?.NetworkMetricsOptions);
readFeedPaginationOptions ??= ReadFeedExecutionOptions.Default;
if (!string.IsNullOrEmpty(continuationToken))
{
bool isNewArrayFormat = (continuationToken.Length >= 2) && (continuationToken[0] == '[') && (continuationToken[continuationToken.Length - 1] == ']');
if (!isNewArrayFormat)
{
// One of the two older formats
if (!FeedRangeContinuation.TryParse(continuationToken, out FeedRangeContinuation feedRangeContinuation))
{
// Backward compatible with old format
feedRangeContinuation = new FeedRangeCompositeContinuation(
containerRid: string.Empty,
FeedRangeEpk.FullRange,
new List<Documents.Routing.Range<string>>()
{
new Documents.Routing.Range<string>(
Documents.Routing.PartitionKeyInternal.MinimumInclusiveEffectivePartitionKey,
Documents.Routing.PartitionKeyInternal.MaximumExclusiveEffectivePartitionKey,
isMinInclusive: true,
isMaxInclusive: false)
},
continuationToken);
}
// need to massage it a little
List<CosmosElement> feedRangeStates = new List<CosmosElement>();
string oldContinuationFormat = feedRangeContinuation.ToString();
if (feedRangeContinuation.FeedRange is FeedRangePartitionKey feedRangePartitionKey)
{
CosmosObject cosmosObject = CosmosObject.Parse(oldContinuationFormat);
CosmosArray continuations = (CosmosArray)cosmosObject["Continuation"];
if (continuations.Count != 1)
{
throw new InvalidOperationException("Expected only one continuation for partition key queries");
}
CosmosElement continuation = continuations[0];
CosmosObject continuationObject = (CosmosObject)continuation;
CosmosElement token = continuationObject["token"];
ReadFeedState state;
if (token is CosmosNull)
{
state = ReadFeedState.Beginning();
}
else
{
CosmosString tokenAsString = (CosmosString)token;
state = ReadFeedState.Continuation(CosmosElement.Parse(tokenAsString.Value));
}
FeedRangeState<ReadFeedState> feedRangeState = new FeedRangeState<ReadFeedState>(feedRangePartitionKey, state);
feedRangeStates.Add(ReadFeedFeedRangeStateSerializer.ToCosmosElement(feedRangeState));
}
else
{
CosmosObject cosmosObject = CosmosObject.Parse(oldContinuationFormat);
CosmosArray continuations = (CosmosArray)cosmosObject["Continuation"];
foreach (CosmosElement continuation in continuations)
{
CosmosObject continuationObject = (CosmosObject)continuation;
CosmosObject rangeObject = (CosmosObject)continuationObject["range"];
string min = ((CosmosString)rangeObject["min"]).Value;
string max = ((CosmosString)rangeObject["max"]).Value;
CosmosElement token = continuationObject["token"];
FeedRangeInternal feedRange = new FeedRangeEpk(new Documents.Routing.Range<string>(min, max, isMinInclusive: true, isMaxInclusive: false));
ReadFeedState state;
if (token is CosmosNull)
{
state = ReadFeedState.Beginning();
}
else
{
CosmosString tokenAsString = (CosmosString)token;
try
{
state = ReadFeedState.Continuation(CosmosElement.Parse(tokenAsString.Value));
}
catch (Exception exception) when (exception.InnerException is JsonParseException)
{
MalformedContinuationTokenException malformedContinuationTokenException = new MalformedContinuationTokenException(exception.Message);
#pragma warning disable CDX1002 // DontUseExceptionStackTrace
throw CosmosExceptionFactory.CreateBadRequestException(
message: $"Malformed Continuation Token: {tokenAsString}.",
headers: CosmosQueryResponseMessageHeaders.ConvertToQueryHeaders(
new Headers(),
default,
default,
(int)SubStatusCodes.MalformedContinuationToken,
default),
stackTrace: exception.StackTrace,
innerException: malformedContinuationTokenException,
trace: null);
#pragma warning restore CDX1002 // DontUseExceptionStackTrace
}
}
FeedRangeState<ReadFeedState> feedRangeState = new FeedRangeState<ReadFeedState>(feedRange, state);
feedRangeStates.Add(ReadFeedFeedRangeStateSerializer.ToCosmosElement(feedRangeState));
}
}
CosmosArray cosmosArrayContinuationTokens = CosmosArray.Create(feedRangeStates);
continuationToken = cosmosArrayContinuationTokens.ToString();
}
}
TryCatch<ReadFeedCrossFeedRangeState> monadicReadFeedState;
if (continuationToken == null)
{
FeedRange feedRange;
if ((this.queryRequestOptions != null) && this.queryRequestOptions.PartitionKey.HasValue)
{
feedRange = new FeedRangePartitionKey(this.queryRequestOptions.PartitionKey.Value);
}
else if ((this.queryRequestOptions != null) && (this.queryRequestOptions.FeedRange != null))
{
feedRange = this.queryRequestOptions.FeedRange;
}
else
{
feedRange = FeedRangeEpk.FullRange;
}
monadicReadFeedState = TryCatch<ReadFeedCrossFeedRangeState>.FromResult(ReadFeedCrossFeedRangeState.CreateFromBeginning(feedRange));
}
else
{
monadicReadFeedState = ReadFeedCrossFeedRangeState.Monadic.Parse(continuationToken);
}
if (monadicReadFeedState.Failed)
{
this.monadicEnumerator = TryCatch<CrossPartitionReadFeedAsyncEnumerator>.FromException(monadicReadFeedState.Exception);
}
else
{
this.monadicEnumerator = TryCatch<CrossPartitionReadFeedAsyncEnumerator>.FromResult(
CrossPartitionReadFeedAsyncEnumerator.Create(
documentContainer,
new CrossFeedRangeState<ReadFeedState>(monadicReadFeedState.Result.FeedRangeStates),
readFeedPaginationOptions));
}
this.hasMoreResults = true;
}