public static TryCatch MonadicCreate()

in Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionQueryPipelineStage.cs [841:1072]


            public static TryCatch<IQueryPipelineStage> MonadicCreate(
                IDocumentContainer documentContainer,
                ContainerQueryProperties containerQueryProperties,
                SqlQuerySpec sqlQuerySpec,
                IReadOnlyList<FeedRangeEpk> targetRanges,
                Cosmos.PartitionKey? partitionKey,
                IReadOnlyList<OrderByColumn> orderByColumns,
                QueryExecutionOptions queryPaginationOptions,
                bool emitRawOrderByPayload,
                int maxConcurrency,
                CosmosElement continuationToken)
            {
                // TODO (brchon): For now we are not honoring non deterministic ORDER BY queries, since there is a bug in the continuation logic.
                // We can turn it back on once the bug is fixed.
                // This shouldn't hurt any query results.

                List<(OrderByQueryPartitionRangePageAsyncEnumerator, OrderByContinuationToken)> enumeratorsAndTokens;
                if (continuationToken == null)
                {
                    // Start off all the partition key ranges with null continuation
                    SqlQuerySpec rewrittenQueryForOrderBy = new SqlQuerySpec(
                        sqlQuerySpec.QueryText.Replace(oldValue: FormatPlaceHolder, newValue: TrueFilter),
                        sqlQuerySpec.Parameters);

                    enumeratorsAndTokens = targetRanges
                        .Select(range => (OrderByQueryPartitionRangePageAsyncEnumerator.Create(
                            documentContainer,
                            containerQueryProperties,
                            rewrittenQueryForOrderBy,
                            new FeedRangeState<QueryState>(range, state: default),
                            partitionKey,
                            queryPaginationOptions,
                            TrueFilter,
                            PrefetchPolicy.PrefetchSinglePage),
                            (OrderByContinuationToken)null))
                        .ToList();
                }
                else
                {
                    TryCatch<PartitionMapper.PartitionMapping<OrderByContinuationToken>> monadicGetOrderByContinuationTokenMapping = MonadicGetOrderByContinuationTokenMapping(
                        targetRanges,
                        continuationToken,
                        orderByColumns.Count);
                    if (monadicGetOrderByContinuationTokenMapping.Failed)
                    {
                        return TryCatch<IQueryPipelineStage>.FromException(monadicGetOrderByContinuationTokenMapping.Exception);
                    }

                    PartitionMapper.PartitionMapping<OrderByContinuationToken> partitionMapping = monadicGetOrderByContinuationTokenMapping.Result;

                    OrderByContinuationToken targetContinuationToken = partitionMapping.TargetMapping.Values.First();

                    int orderByResumeValueCount = 0;
                    IReadOnlyList<SqlQueryResumeValue> resumeValues;
                    IReadOnlyList<CosmosElement> orderByItems;
                    if (targetContinuationToken.ResumeValues != null)
                    {
                        // Use SqlQueryResumeValue for continuation if it is present.
                        resumeValues = targetContinuationToken.ResumeValues;
                        orderByItems = null;
                        orderByResumeValueCount = resumeValues.Count;
                    }
                    else
                    {
                        // If continuation token has only OrderByItems, check if it can be converted to SqlQueryResumeValue. This will
                        // help avoid re-writing the query. Conversion will work as long as the order by item type is a supported type. 
                        orderByResumeValueCount = targetContinuationToken.OrderByItems.Count;

                        if (ContainsSupportedResumeTypes(targetContinuationToken.OrderByItems))
                        {
                            // Convert the order by items to SqlQueryResumeValue
                            List<SqlQueryResumeValue> generatedResumeValues = new List<SqlQueryResumeValue>(targetContinuationToken.OrderByItems.Count);
                            //foreach (CosmosElement orderByItem in orderByItems)
                            foreach (OrderByItem orderByItem in targetContinuationToken.OrderByItems)
                            {
                                generatedResumeValues.Add(SqlQueryResumeValue.FromOrderByValue(orderByItem.Item));
                            }

                            resumeValues = generatedResumeValues;
                            orderByItems = null;
                        }
                        else
                        {
                            orderByItems = targetContinuationToken.OrderByItems.Select(x => x.Item).ToList();
                            resumeValues = null;
                        }
                    }

                    if (orderByResumeValueCount != orderByColumns.Count)
                    {
                        return TryCatch<IQueryPipelineStage>.FromException(
                            new MalformedContinuationTokenException(
                                $"Order By Items from continuation token did not match the query text. " +
                                $"Order by item count: {orderByResumeValueCount} did not match column count {orderByColumns.Count()}. " +
                                $"Continuation token: {targetContinuationToken}"));
                    }

                    enumeratorsAndTokens = new List<(OrderByQueryPartitionRangePageAsyncEnumerator, OrderByContinuationToken)>();
                    if (resumeValues != null)
                    {
                        // Continuation contains resume values, so update SqlQuerySpec to include SqlQueryResumeFilter which
                        // will specify the resume point to the backend. This avoid having to re-write the query. 

                        // Process partitions left of Target. The resume values in these partition have
                        // already been processed so exclude flag is set to true.
                        SqlQuerySpec leftQuerySpec = new SqlQuerySpec(
                            sqlQuerySpec.QueryText.Replace(oldValue: FormatPlaceHolder, newValue: TrueFilter),
                            sqlQuerySpec.Parameters,
                            new SqlQueryResumeFilter(resumeValues, null, true));

                        foreach (KeyValuePair<FeedRangeEpk, OrderByContinuationToken> kvp in partitionMapping.MappingLeftOfTarget)
                        {
                            FeedRangeEpk range = kvp.Key;
                            OrderByContinuationToken token = kvp.Value;
                            OrderByQueryPartitionRangePageAsyncEnumerator remoteEnumerator = OrderByQueryPartitionRangePageAsyncEnumerator.Create(
                                documentContainer,
                                containerQueryProperties,
                                leftQuerySpec,
                                new FeedRangeState<QueryState>(range, token?.ParallelContinuationToken?.Token != null ? new QueryState(CosmosString.Create(token.ParallelContinuationToken.Token)) : null),
                                partitionKey,
                                queryPaginationOptions,
                                filter: null,
                                PrefetchPolicy.PrefetchSinglePage);

                            enumeratorsAndTokens.Add((remoteEnumerator, token));
                        }

                        // Process Target Partitions which is the last partition from which data has been returned. 
                        // For this partition the Rid value needs to be set if present. Exclude flag is not set as the document
                        // matching the Rid will be skipped in SDK based on SkipCount value. 
                        // Backend requests can contains both SqlQueryResumeFilter and ContinuationToken and the backend will pick
                        // the resume point that is bigger i.e. most restrictive
                        foreach (KeyValuePair<FeedRangeEpk, OrderByContinuationToken> kvp in partitionMapping.TargetMapping)
                        {
                            FeedRangeEpk range = kvp.Key;
                            OrderByContinuationToken token = kvp.Value;

                            SqlQuerySpec targetQuerySpec = new SqlQuerySpec(
                                sqlQuerySpec.QueryText.Replace(oldValue: FormatPlaceHolder, newValue: TrueFilter),
                                sqlQuerySpec.Parameters,
                                new SqlQueryResumeFilter(resumeValues, token?.Rid, false));

                            OrderByQueryPartitionRangePageAsyncEnumerator remoteEnumerator = OrderByQueryPartitionRangePageAsyncEnumerator.Create(
                                documentContainer,
                                containerQueryProperties,
                                targetQuerySpec,
                                new FeedRangeState<QueryState>(range, token?.ParallelContinuationToken?.Token != null ? new QueryState(CosmosString.Create(token.ParallelContinuationToken.Token)) : null),
                                partitionKey,
                                queryPaginationOptions,
                                filter: null,
                                PrefetchPolicy.PrefetchSinglePage);

                            enumeratorsAndTokens.Add((remoteEnumerator, token));
                        }

                        // Process partitions right of target. The Resume value in these partitions have not been processed so the exclude value is set to false.
                        SqlQuerySpec rightQuerySpec = new SqlQuerySpec(
                            sqlQuerySpec.QueryText.Replace(oldValue: FormatPlaceHolder, newValue: TrueFilter),
                            sqlQuerySpec.Parameters,
                            new SqlQueryResumeFilter(resumeValues, null, false));

                        foreach (KeyValuePair<FeedRangeEpk, OrderByContinuationToken> kvp in partitionMapping.MappingRightOfTarget)
                        {
                            FeedRangeEpk range = kvp.Key;
                            OrderByContinuationToken token = kvp.Value;
                            OrderByQueryPartitionRangePageAsyncEnumerator remoteEnumerator = OrderByQueryPartitionRangePageAsyncEnumerator.Create(
                                documentContainer,
                                containerQueryProperties,
                                rightQuerySpec,
                                new FeedRangeState<QueryState>(range, token?.ParallelContinuationToken?.Token != null ? new QueryState(CosmosString.Create(token.ParallelContinuationToken.Token)) : null),
                                partitionKey,
                                queryPaginationOptions,
                                filter: null,
                                PrefetchPolicy.PrefetchSinglePage);

                            enumeratorsAndTokens.Add((remoteEnumerator, token));
                        }
                    }
                    else
                    {
                        // If continuation token doesn't have resume values or if order by items cannot be converted to resume values then
                        // rewrite the query filter to get the correct resume point
                        ReadOnlyMemory<(OrderByColumn, CosmosElement)> columnAndItems = orderByColumns.Zip(orderByItems, (column, item) => (column, item)).ToArray();

                        // For ascending order-by, left of target partition has filter expression > value,
                        // right of target partition has filter expression >= value, 
                        // and target partition takes the previous filter from continuation (or true if no continuation)
                        (string leftFilter, string targetFilter, string rightFilter) = GetFormattedFilters(columnAndItems);
                        List<(IReadOnlyDictionary<FeedRangeEpk, OrderByContinuationToken>, string)> tokenMappingAndFilters = new List<(IReadOnlyDictionary<FeedRangeEpk, OrderByContinuationToken>, string)>()
                    {
                        { (partitionMapping.MappingLeftOfTarget, leftFilter) },
                        { (partitionMapping.TargetMapping, targetFilter) },
                        { (partitionMapping.MappingRightOfTarget, rightFilter) },
                    };

                        foreach ((IReadOnlyDictionary<FeedRangeEpk, OrderByContinuationToken> tokenMapping, string filter) in tokenMappingAndFilters)
                        {
                            SqlQuerySpec rewrittenQueryForOrderBy = new SqlQuerySpec(
                                sqlQuerySpec.QueryText.Replace(oldValue: FormatPlaceHolder, newValue: filter),
                                sqlQuerySpec.Parameters);

                            foreach (KeyValuePair<FeedRangeEpk, OrderByContinuationToken> kvp in tokenMapping)
                            {
                                FeedRangeEpk range = kvp.Key;
                                OrderByContinuationToken token = kvp.Value;
                                OrderByQueryPartitionRangePageAsyncEnumerator remoteEnumerator = OrderByQueryPartitionRangePageAsyncEnumerator.Create(
                                    documentContainer,
                                    containerQueryProperties,
                                    rewrittenQueryForOrderBy,
                                    new FeedRangeState<QueryState>(range, token?.ParallelContinuationToken?.Token != null ? new QueryState(CosmosString.Create(token.ParallelContinuationToken.Token)) : null),
                                    partitionKey,
                                    queryPaginationOptions,
                                    filter,
                                    PrefetchPolicy.PrefetchSinglePage);

                                enumeratorsAndTokens.Add((remoteEnumerator, token));
                            }
                        }
                    }
                }

                StreamingOrderByCrossPartitionQueryPipelineStage stage = new StreamingOrderByCrossPartitionQueryPipelineStage(
                    documentContainer,
                    containerQueryProperties,
                    orderByColumns.Select(column => column.SortOrder).ToList(),
                    queryPaginationOptions,
                    emitRawOrderByPayload,
                    maxConcurrency,
                    enumeratorsAndTokens,
                    continuationToken == null ? null : new QueryState(continuationToken));
                return TryCatch<IQueryPipelineStage>.FromResult(stage);
            }