public ChangeFeedIteratorCore()

in Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedIteratorCore.cs [29:220]


        public ChangeFeedIteratorCore(
            IDocumentContainer documentContainer,
            ChangeFeedMode changeFeedMode,
            ChangeFeedRequestOptions changeFeedRequestOptions,
            ChangeFeedStartFrom changeFeedStartFrom,
            CosmosClientContext clientContext,
            ContainerInternal container,
            ChangeFeedQuerySpec changeFeedQuerySpec = null)
        {
            if (changeFeedStartFrom == null)
            {
                throw new ArgumentNullException(nameof(changeFeedStartFrom));
            }

            if (changeFeedMode == null)
            {
                throw new ArgumentNullException(nameof(changeFeedMode));
            }

            this.container = container;
            this.clientContext = clientContext;
            this.documentContainer = documentContainer ?? throw new ArgumentNullException(nameof(documentContainer));
            this.changeFeedRequestOptions = changeFeedRequestOptions ?? new ChangeFeedRequestOptions();
            this.changeFeedQuerySpec = changeFeedQuerySpec;

            this.operationName = OpenTelemetryConstants.Operations.QueryChangeFeed;

            this.lazyMonadicEnumerator = new AsyncLazy<TryCatch<CrossPartitionChangeFeedAsyncEnumerator>>(
                valueFactory: async (trace, cancellationToken) =>
                {
                    if (changeFeedStartFrom is ChangeFeedStartFromContinuation startFromContinuation)
                    {
                        TryCatch<CosmosElement> monadicParsedToken = CosmosElement.Monadic.Parse(startFromContinuation.Continuation);
                        if (monadicParsedToken.Failed)
                        {
                            return TryCatch<CrossPartitionChangeFeedAsyncEnumerator>.FromException(
                                new MalformedChangeFeedContinuationTokenException(
                                    message: $"Failed to parse continuation token: {startFromContinuation.Continuation}.",
                                    innerException: monadicParsedToken.Exception));
                        }

                        TryCatch<VersionedAndRidCheckedCompositeToken> monadicVersionedToken = VersionedAndRidCheckedCompositeToken
                            .MonadicCreateFromCosmosElement(monadicParsedToken.Result);
                        if (monadicVersionedToken.Failed)
                        {
                            return TryCatch<CrossPartitionChangeFeedAsyncEnumerator>.FromException(
                                new MalformedChangeFeedContinuationTokenException(
                                    message: $"Failed to parse continuation token: {startFromContinuation.Continuation}.",
                                    innerException: monadicVersionedToken.Exception));
                        }

                        VersionedAndRidCheckedCompositeToken versionedAndRidCheckedCompositeToken = monadicVersionedToken.Result;
                        if (versionedAndRidCheckedCompositeToken.VersionNumber == VersionedAndRidCheckedCompositeToken.Version.V1)
                        {
                            // Need to migrate continuation token
                            if (!(versionedAndRidCheckedCompositeToken.ContinuationToken is CosmosArray cosmosArray))
                            {
                                return TryCatch<CrossPartitionChangeFeedAsyncEnumerator>.FromException(
                                    new MalformedChangeFeedContinuationTokenException(
                                        message: $"Failed to parse get array continuation token: {startFromContinuation.Continuation}."));
                            }

                            List<CosmosElement> changeFeedTokensV2 = new List<CosmosElement>();
                            foreach (CosmosElement arrayItem in cosmosArray)
                            {
                                if (!(arrayItem is CosmosObject cosmosObject))
                                {
                                    return TryCatch<CrossPartitionChangeFeedAsyncEnumerator>.FromException(
                                        new MalformedChangeFeedContinuationTokenException(
                                            message: $"Failed to parse get object in composite continuation: {startFromContinuation.Continuation}."));
                                }

                                if (!cosmosObject.TryGetValue("range", out CosmosElement rangeItem))
                                {
                                    return TryCatch<CrossPartitionChangeFeedAsyncEnumerator>.FromException(
                                        new MalformedChangeFeedContinuationTokenException(
                                            message: $"Failed to parse token: {cosmosObject}."));
                                }

                                if (!(rangeItem is CosmosObject rangeElement))
                                {
                                    return TryCatch<CrossPartitionChangeFeedAsyncEnumerator>.FromException(
                                        new MalformedChangeFeedContinuationTokenException(
                                            message: $"Failed to parse get object in composite continuation: {startFromContinuation.Continuation}."));
                                }

                                if (!rangeElement.TryGetValue("min", out CosmosString min))
                                {
                                    return TryCatch<CrossPartitionChangeFeedAsyncEnumerator>.FromException(
                                        new MalformedChangeFeedContinuationTokenException(
                                            message: $"Failed to parse start of range: {cosmosObject}."));
                                }

                                if (!rangeElement.TryGetValue("max", out CosmosString max))
                                {
                                    return TryCatch<CrossPartitionChangeFeedAsyncEnumerator>.FromException(
                                        new MalformedChangeFeedContinuationTokenException(
                                            message: $"Failed to parse end of range: {cosmosObject}."));
                                }

                                if (!cosmosObject.TryGetValue("token", out CosmosElement token))
                                {
                                    return TryCatch<CrossPartitionChangeFeedAsyncEnumerator>.FromException(
                                        new MalformedChangeFeedContinuationTokenException(
                                            message: $"Failed to parse token: {cosmosObject}."));
                                }

                                FeedRangeEpk feedRangeEpk = new FeedRangeEpk(new Documents.Routing.Range<string>(
                                    min: min.Value,
                                    max: max.Value,
                                    isMinInclusive: true,
                                    isMaxInclusive: false));
                                ChangeFeedState state = token is CosmosNull ? ChangeFeedState.Beginning() : ChangeFeedStateContinuation.Continuation(token);

                                FeedRangeState<ChangeFeedState> feedRangeState = new FeedRangeState<ChangeFeedState>(feedRangeEpk, state);
                                changeFeedTokensV2.Add(ChangeFeedFeedRangeStateSerializer.ToCosmosElement(feedRangeState));
                            }

                            CosmosArray changeFeedTokensArrayV2 = CosmosArray.Create(changeFeedTokensV2);

                            versionedAndRidCheckedCompositeToken = new VersionedAndRidCheckedCompositeToken(
                                VersionedAndRidCheckedCompositeToken.Version.V2,
                                changeFeedTokensArrayV2,
                                versionedAndRidCheckedCompositeToken.Rid);
                        }

                        if (versionedAndRidCheckedCompositeToken.VersionNumber != VersionedAndRidCheckedCompositeToken.Version.V2)
                        {
                            return TryCatch<CrossPartitionChangeFeedAsyncEnumerator>.FromException(
                                new MalformedChangeFeedContinuationTokenException(
                                    message: $"Wrong version number: {versionedAndRidCheckedCompositeToken.VersionNumber}."));
                        }

                        string collectionRid = await documentContainer.GetResourceIdentifierAsync(trace, cancellationToken);
                        if (versionedAndRidCheckedCompositeToken.Rid != collectionRid)
                        {
                            return TryCatch<CrossPartitionChangeFeedAsyncEnumerator>.FromException(
                                new MalformedChangeFeedContinuationTokenException(
                                    message: $"rids mismatched. Expected: {collectionRid} but got {versionedAndRidCheckedCompositeToken.Rid}."));
                        }

                        changeFeedStartFrom = ChangeFeedStartFrom.ContinuationToken(versionedAndRidCheckedCompositeToken.ContinuationToken.ToString());
                    }

                    TryCatch<ChangeFeedCrossFeedRangeState> monadicChangeFeedCrossFeedRangeState = changeFeedStartFrom.Accept(ChangeFeedStateFromToChangeFeedCrossFeedRangeState.Singleton);
                    if (monadicChangeFeedCrossFeedRangeState.Failed)
                    {
                        return TryCatch<CrossPartitionChangeFeedAsyncEnumerator>.FromException(
                            new MalformedChangeFeedContinuationTokenException(
                                message: $"Could not convert to {nameof(ChangeFeedCrossFeedRangeState)}.",
                                innerException: monadicChangeFeedCrossFeedRangeState.Exception));
                    }

                    Dictionary<string, string> additionalHeaders;
                    if (changeFeedRequestOptions?.Properties != null)
                    {
                        additionalHeaders = new Dictionary<string, string>();
                        Dictionary<string, object> nonStringHeaders = new Dictionary<string, object>();
                        foreach (KeyValuePair<string, object> keyValuePair in changeFeedRequestOptions.Properties)
                        {
                            if (keyValuePair.Value is string stringValue)
                            {
                                additionalHeaders[keyValuePair.Key] = stringValue;
                            }
                            else
                            {
                                nonStringHeaders[keyValuePair.Key] = keyValuePair.Value;
                            }
                        }

                        changeFeedRequestOptions.Properties = nonStringHeaders;
                    }
                    else
                    {
                        additionalHeaders = null;
                    }

                    CrossPartitionChangeFeedAsyncEnumerator enumerator = CrossPartitionChangeFeedAsyncEnumerator.Create(
                        documentContainer,
                        new CrossFeedRangeState<ChangeFeedState>(monadicChangeFeedCrossFeedRangeState.Result.FeedRangeStates),
                        new ChangeFeedExecutionOptions(
                            changeFeedMode,
                            changeFeedRequestOptions?.PageSizeHint,
                            changeFeedRequestOptions?.JsonSerializationFormatOptions?.JsonSerializationFormat,
                            additionalHeaders,
                            this.changeFeedQuerySpec));

                    TryCatch<CrossPartitionChangeFeedAsyncEnumerator> monadicEnumerator = TryCatch<CrossPartitionChangeFeedAsyncEnumerator>.FromResult(enumerator);
                    return monadicEnumerator;
                });
            this.hasMoreResults = true;
        }