in Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedIteratorCore.cs [29:220]
public ChangeFeedIteratorCore(
IDocumentContainer documentContainer,
ChangeFeedMode changeFeedMode,
ChangeFeedRequestOptions changeFeedRequestOptions,
ChangeFeedStartFrom changeFeedStartFrom,
CosmosClientContext clientContext,
ContainerInternal container,
ChangeFeedQuerySpec changeFeedQuerySpec = null)
{
if (changeFeedStartFrom == null)
{
throw new ArgumentNullException(nameof(changeFeedStartFrom));
}
if (changeFeedMode == null)
{
throw new ArgumentNullException(nameof(changeFeedMode));
}
this.container = container;
this.clientContext = clientContext;
this.documentContainer = documentContainer ?? throw new ArgumentNullException(nameof(documentContainer));
this.changeFeedRequestOptions = changeFeedRequestOptions ?? new ChangeFeedRequestOptions();
this.changeFeedQuerySpec = changeFeedQuerySpec;
this.operationName = OpenTelemetryConstants.Operations.QueryChangeFeed;
this.lazyMonadicEnumerator = new AsyncLazy<TryCatch<CrossPartitionChangeFeedAsyncEnumerator>>(
valueFactory: async (trace, cancellationToken) =>
{
if (changeFeedStartFrom is ChangeFeedStartFromContinuation startFromContinuation)
{
TryCatch<CosmosElement> monadicParsedToken = CosmosElement.Monadic.Parse(startFromContinuation.Continuation);
if (monadicParsedToken.Failed)
{
return TryCatch<CrossPartitionChangeFeedAsyncEnumerator>.FromException(
new MalformedChangeFeedContinuationTokenException(
message: $"Failed to parse continuation token: {startFromContinuation.Continuation}.",
innerException: monadicParsedToken.Exception));
}
TryCatch<VersionedAndRidCheckedCompositeToken> monadicVersionedToken = VersionedAndRidCheckedCompositeToken
.MonadicCreateFromCosmosElement(monadicParsedToken.Result);
if (monadicVersionedToken.Failed)
{
return TryCatch<CrossPartitionChangeFeedAsyncEnumerator>.FromException(
new MalformedChangeFeedContinuationTokenException(
message: $"Failed to parse continuation token: {startFromContinuation.Continuation}.",
innerException: monadicVersionedToken.Exception));
}
VersionedAndRidCheckedCompositeToken versionedAndRidCheckedCompositeToken = monadicVersionedToken.Result;
if (versionedAndRidCheckedCompositeToken.VersionNumber == VersionedAndRidCheckedCompositeToken.Version.V1)
{
// Need to migrate continuation token
if (!(versionedAndRidCheckedCompositeToken.ContinuationToken is CosmosArray cosmosArray))
{
return TryCatch<CrossPartitionChangeFeedAsyncEnumerator>.FromException(
new MalformedChangeFeedContinuationTokenException(
message: $"Failed to parse get array continuation token: {startFromContinuation.Continuation}."));
}
List<CosmosElement> changeFeedTokensV2 = new List<CosmosElement>();
foreach (CosmosElement arrayItem in cosmosArray)
{
if (!(arrayItem is CosmosObject cosmosObject))
{
return TryCatch<CrossPartitionChangeFeedAsyncEnumerator>.FromException(
new MalformedChangeFeedContinuationTokenException(
message: $"Failed to parse get object in composite continuation: {startFromContinuation.Continuation}."));
}
if (!cosmosObject.TryGetValue("range", out CosmosElement rangeItem))
{
return TryCatch<CrossPartitionChangeFeedAsyncEnumerator>.FromException(
new MalformedChangeFeedContinuationTokenException(
message: $"Failed to parse token: {cosmosObject}."));
}
if (!(rangeItem is CosmosObject rangeElement))
{
return TryCatch<CrossPartitionChangeFeedAsyncEnumerator>.FromException(
new MalformedChangeFeedContinuationTokenException(
message: $"Failed to parse get object in composite continuation: {startFromContinuation.Continuation}."));
}
if (!rangeElement.TryGetValue("min", out CosmosString min))
{
return TryCatch<CrossPartitionChangeFeedAsyncEnumerator>.FromException(
new MalformedChangeFeedContinuationTokenException(
message: $"Failed to parse start of range: {cosmosObject}."));
}
if (!rangeElement.TryGetValue("max", out CosmosString max))
{
return TryCatch<CrossPartitionChangeFeedAsyncEnumerator>.FromException(
new MalformedChangeFeedContinuationTokenException(
message: $"Failed to parse end of range: {cosmosObject}."));
}
if (!cosmosObject.TryGetValue("token", out CosmosElement token))
{
return TryCatch<CrossPartitionChangeFeedAsyncEnumerator>.FromException(
new MalformedChangeFeedContinuationTokenException(
message: $"Failed to parse token: {cosmosObject}."));
}
FeedRangeEpk feedRangeEpk = new FeedRangeEpk(new Documents.Routing.Range<string>(
min: min.Value,
max: max.Value,
isMinInclusive: true,
isMaxInclusive: false));
ChangeFeedState state = token is CosmosNull ? ChangeFeedState.Beginning() : ChangeFeedStateContinuation.Continuation(token);
FeedRangeState<ChangeFeedState> feedRangeState = new FeedRangeState<ChangeFeedState>(feedRangeEpk, state);
changeFeedTokensV2.Add(ChangeFeedFeedRangeStateSerializer.ToCosmosElement(feedRangeState));
}
CosmosArray changeFeedTokensArrayV2 = CosmosArray.Create(changeFeedTokensV2);
versionedAndRidCheckedCompositeToken = new VersionedAndRidCheckedCompositeToken(
VersionedAndRidCheckedCompositeToken.Version.V2,
changeFeedTokensArrayV2,
versionedAndRidCheckedCompositeToken.Rid);
}
if (versionedAndRidCheckedCompositeToken.VersionNumber != VersionedAndRidCheckedCompositeToken.Version.V2)
{
return TryCatch<CrossPartitionChangeFeedAsyncEnumerator>.FromException(
new MalformedChangeFeedContinuationTokenException(
message: $"Wrong version number: {versionedAndRidCheckedCompositeToken.VersionNumber}."));
}
string collectionRid = await documentContainer.GetResourceIdentifierAsync(trace, cancellationToken);
if (versionedAndRidCheckedCompositeToken.Rid != collectionRid)
{
return TryCatch<CrossPartitionChangeFeedAsyncEnumerator>.FromException(
new MalformedChangeFeedContinuationTokenException(
message: $"rids mismatched. Expected: {collectionRid} but got {versionedAndRidCheckedCompositeToken.Rid}."));
}
changeFeedStartFrom = ChangeFeedStartFrom.ContinuationToken(versionedAndRidCheckedCompositeToken.ContinuationToken.ToString());
}
TryCatch<ChangeFeedCrossFeedRangeState> monadicChangeFeedCrossFeedRangeState = changeFeedStartFrom.Accept(ChangeFeedStateFromToChangeFeedCrossFeedRangeState.Singleton);
if (monadicChangeFeedCrossFeedRangeState.Failed)
{
return TryCatch<CrossPartitionChangeFeedAsyncEnumerator>.FromException(
new MalformedChangeFeedContinuationTokenException(
message: $"Could not convert to {nameof(ChangeFeedCrossFeedRangeState)}.",
innerException: monadicChangeFeedCrossFeedRangeState.Exception));
}
Dictionary<string, string> additionalHeaders;
if (changeFeedRequestOptions?.Properties != null)
{
additionalHeaders = new Dictionary<string, string>();
Dictionary<string, object> nonStringHeaders = new Dictionary<string, object>();
foreach (KeyValuePair<string, object> keyValuePair in changeFeedRequestOptions.Properties)
{
if (keyValuePair.Value is string stringValue)
{
additionalHeaders[keyValuePair.Key] = stringValue;
}
else
{
nonStringHeaders[keyValuePair.Key] = keyValuePair.Value;
}
}
changeFeedRequestOptions.Properties = nonStringHeaders;
}
else
{
additionalHeaders = null;
}
CrossPartitionChangeFeedAsyncEnumerator enumerator = CrossPartitionChangeFeedAsyncEnumerator.Create(
documentContainer,
new CrossFeedRangeState<ChangeFeedState>(monadicChangeFeedCrossFeedRangeState.Result.FeedRangeStates),
new ChangeFeedExecutionOptions(
changeFeedMode,
changeFeedRequestOptions?.PageSizeHint,
changeFeedRequestOptions?.JsonSerializationFormatOptions?.JsonSerializationFormat,
additionalHeaders,
this.changeFeedQuerySpec));
TryCatch<CrossPartitionChangeFeedAsyncEnumerator> monadicEnumerator = TryCatch<CrossPartitionChangeFeedAsyncEnumerator>.FromResult(enumerator);
return monadicEnumerator;
});
this.hasMoreResults = true;
}