in Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionQueryPipelineStage.cs [841:1072]
public static TryCatch<IQueryPipelineStage> MonadicCreate(
IDocumentContainer documentContainer,
ContainerQueryProperties containerQueryProperties,
SqlQuerySpec sqlQuerySpec,
IReadOnlyList<FeedRangeEpk> targetRanges,
Cosmos.PartitionKey? partitionKey,
IReadOnlyList<OrderByColumn> orderByColumns,
QueryExecutionOptions queryPaginationOptions,
bool emitRawOrderByPayload,
int maxConcurrency,
CosmosElement continuationToken)
{
// TODO (brchon): For now we are not honoring non deterministic ORDER BY queries, since there is a bug in the continuation logic.
// We can turn it back on once the bug is fixed.
// This shouldn't hurt any query results.
List<(OrderByQueryPartitionRangePageAsyncEnumerator, OrderByContinuationToken)> enumeratorsAndTokens;
if (continuationToken == null)
{
// Start off all the partition key ranges with null continuation
SqlQuerySpec rewrittenQueryForOrderBy = new SqlQuerySpec(
sqlQuerySpec.QueryText.Replace(oldValue: FormatPlaceHolder, newValue: TrueFilter),
sqlQuerySpec.Parameters);
enumeratorsAndTokens = targetRanges
.Select(range => (OrderByQueryPartitionRangePageAsyncEnumerator.Create(
documentContainer,
containerQueryProperties,
rewrittenQueryForOrderBy,
new FeedRangeState<QueryState>(range, state: default),
partitionKey,
queryPaginationOptions,
TrueFilter,
PrefetchPolicy.PrefetchSinglePage),
(OrderByContinuationToken)null))
.ToList();
}
else
{
TryCatch<PartitionMapper.PartitionMapping<OrderByContinuationToken>> monadicGetOrderByContinuationTokenMapping = MonadicGetOrderByContinuationTokenMapping(
targetRanges,
continuationToken,
orderByColumns.Count);
if (monadicGetOrderByContinuationTokenMapping.Failed)
{
return TryCatch<IQueryPipelineStage>.FromException(monadicGetOrderByContinuationTokenMapping.Exception);
}
PartitionMapper.PartitionMapping<OrderByContinuationToken> partitionMapping = monadicGetOrderByContinuationTokenMapping.Result;
OrderByContinuationToken targetContinuationToken = partitionMapping.TargetMapping.Values.First();
int orderByResumeValueCount = 0;
IReadOnlyList<SqlQueryResumeValue> resumeValues;
IReadOnlyList<CosmosElement> orderByItems;
if (targetContinuationToken.ResumeValues != null)
{
// Use SqlQueryResumeValue for continuation if it is present.
resumeValues = targetContinuationToken.ResumeValues;
orderByItems = null;
orderByResumeValueCount = resumeValues.Count;
}
else
{
// If continuation token has only OrderByItems, check if it can be converted to SqlQueryResumeValue. This will
// help avoid re-writing the query. Conversion will work as long as the order by item type is a supported type.
orderByResumeValueCount = targetContinuationToken.OrderByItems.Count;
if (ContainsSupportedResumeTypes(targetContinuationToken.OrderByItems))
{
// Convert the order by items to SqlQueryResumeValue
List<SqlQueryResumeValue> generatedResumeValues = new List<SqlQueryResumeValue>(targetContinuationToken.OrderByItems.Count);
//foreach (CosmosElement orderByItem in orderByItems)
foreach (OrderByItem orderByItem in targetContinuationToken.OrderByItems)
{
generatedResumeValues.Add(SqlQueryResumeValue.FromOrderByValue(orderByItem.Item));
}
resumeValues = generatedResumeValues;
orderByItems = null;
}
else
{
orderByItems = targetContinuationToken.OrderByItems.Select(x => x.Item).ToList();
resumeValues = null;
}
}
if (orderByResumeValueCount != orderByColumns.Count)
{
return TryCatch<IQueryPipelineStage>.FromException(
new MalformedContinuationTokenException(
$"Order By Items from continuation token did not match the query text. " +
$"Order by item count: {orderByResumeValueCount} did not match column count {orderByColumns.Count()}. " +
$"Continuation token: {targetContinuationToken}"));
}
enumeratorsAndTokens = new List<(OrderByQueryPartitionRangePageAsyncEnumerator, OrderByContinuationToken)>();
if (resumeValues != null)
{
// Continuation contains resume values, so update SqlQuerySpec to include SqlQueryResumeFilter which
// will specify the resume point to the backend. This avoid having to re-write the query.
// Process partitions left of Target. The resume values in these partition have
// already been processed so exclude flag is set to true.
SqlQuerySpec leftQuerySpec = new SqlQuerySpec(
sqlQuerySpec.QueryText.Replace(oldValue: FormatPlaceHolder, newValue: TrueFilter),
sqlQuerySpec.Parameters,
new SqlQueryResumeFilter(resumeValues, null, true));
foreach (KeyValuePair<FeedRangeEpk, OrderByContinuationToken> kvp in partitionMapping.MappingLeftOfTarget)
{
FeedRangeEpk range = kvp.Key;
OrderByContinuationToken token = kvp.Value;
OrderByQueryPartitionRangePageAsyncEnumerator remoteEnumerator = OrderByQueryPartitionRangePageAsyncEnumerator.Create(
documentContainer,
containerQueryProperties,
leftQuerySpec,
new FeedRangeState<QueryState>(range, token?.ParallelContinuationToken?.Token != null ? new QueryState(CosmosString.Create(token.ParallelContinuationToken.Token)) : null),
partitionKey,
queryPaginationOptions,
filter: null,
PrefetchPolicy.PrefetchSinglePage);
enumeratorsAndTokens.Add((remoteEnumerator, token));
}
// Process Target Partitions which is the last partition from which data has been returned.
// For this partition the Rid value needs to be set if present. Exclude flag is not set as the document
// matching the Rid will be skipped in SDK based on SkipCount value.
// Backend requests can contains both SqlQueryResumeFilter and ContinuationToken and the backend will pick
// the resume point that is bigger i.e. most restrictive
foreach (KeyValuePair<FeedRangeEpk, OrderByContinuationToken> kvp in partitionMapping.TargetMapping)
{
FeedRangeEpk range = kvp.Key;
OrderByContinuationToken token = kvp.Value;
SqlQuerySpec targetQuerySpec = new SqlQuerySpec(
sqlQuerySpec.QueryText.Replace(oldValue: FormatPlaceHolder, newValue: TrueFilter),
sqlQuerySpec.Parameters,
new SqlQueryResumeFilter(resumeValues, token?.Rid, false));
OrderByQueryPartitionRangePageAsyncEnumerator remoteEnumerator = OrderByQueryPartitionRangePageAsyncEnumerator.Create(
documentContainer,
containerQueryProperties,
targetQuerySpec,
new FeedRangeState<QueryState>(range, token?.ParallelContinuationToken?.Token != null ? new QueryState(CosmosString.Create(token.ParallelContinuationToken.Token)) : null),
partitionKey,
queryPaginationOptions,
filter: null,
PrefetchPolicy.PrefetchSinglePage);
enumeratorsAndTokens.Add((remoteEnumerator, token));
}
// Process partitions right of target. The Resume value in these partitions have not been processed so the exclude value is set to false.
SqlQuerySpec rightQuerySpec = new SqlQuerySpec(
sqlQuerySpec.QueryText.Replace(oldValue: FormatPlaceHolder, newValue: TrueFilter),
sqlQuerySpec.Parameters,
new SqlQueryResumeFilter(resumeValues, null, false));
foreach (KeyValuePair<FeedRangeEpk, OrderByContinuationToken> kvp in partitionMapping.MappingRightOfTarget)
{
FeedRangeEpk range = kvp.Key;
OrderByContinuationToken token = kvp.Value;
OrderByQueryPartitionRangePageAsyncEnumerator remoteEnumerator = OrderByQueryPartitionRangePageAsyncEnumerator.Create(
documentContainer,
containerQueryProperties,
rightQuerySpec,
new FeedRangeState<QueryState>(range, token?.ParallelContinuationToken?.Token != null ? new QueryState(CosmosString.Create(token.ParallelContinuationToken.Token)) : null),
partitionKey,
queryPaginationOptions,
filter: null,
PrefetchPolicy.PrefetchSinglePage);
enumeratorsAndTokens.Add((remoteEnumerator, token));
}
}
else
{
// If continuation token doesn't have resume values or if order by items cannot be converted to resume values then
// rewrite the query filter to get the correct resume point
ReadOnlyMemory<(OrderByColumn, CosmosElement)> columnAndItems = orderByColumns.Zip(orderByItems, (column, item) => (column, item)).ToArray();
// For ascending order-by, left of target partition has filter expression > value,
// right of target partition has filter expression >= value,
// and target partition takes the previous filter from continuation (or true if no continuation)
(string leftFilter, string targetFilter, string rightFilter) = GetFormattedFilters(columnAndItems);
List<(IReadOnlyDictionary<FeedRangeEpk, OrderByContinuationToken>, string)> tokenMappingAndFilters = new List<(IReadOnlyDictionary<FeedRangeEpk, OrderByContinuationToken>, string)>()
{
{ (partitionMapping.MappingLeftOfTarget, leftFilter) },
{ (partitionMapping.TargetMapping, targetFilter) },
{ (partitionMapping.MappingRightOfTarget, rightFilter) },
};
foreach ((IReadOnlyDictionary<FeedRangeEpk, OrderByContinuationToken> tokenMapping, string filter) in tokenMappingAndFilters)
{
SqlQuerySpec rewrittenQueryForOrderBy = new SqlQuerySpec(
sqlQuerySpec.QueryText.Replace(oldValue: FormatPlaceHolder, newValue: filter),
sqlQuerySpec.Parameters);
foreach (KeyValuePair<FeedRangeEpk, OrderByContinuationToken> kvp in tokenMapping)
{
FeedRangeEpk range = kvp.Key;
OrderByContinuationToken token = kvp.Value;
OrderByQueryPartitionRangePageAsyncEnumerator remoteEnumerator = OrderByQueryPartitionRangePageAsyncEnumerator.Create(
documentContainer,
containerQueryProperties,
rewrittenQueryForOrderBy,
new FeedRangeState<QueryState>(range, token?.ParallelContinuationToken?.Token != null ? new QueryState(CosmosString.Create(token.ParallelContinuationToken.Token)) : null),
partitionKey,
queryPaginationOptions,
filter,
PrefetchPolicy.PrefetchSinglePage);
enumeratorsAndTokens.Add((remoteEnumerator, token));
}
}
}
}
StreamingOrderByCrossPartitionQueryPipelineStage stage = new StreamingOrderByCrossPartitionQueryPipelineStage(
documentContainer,
containerQueryProperties,
orderByColumns.Select(column => column.SortOrder).ToList(),
queryPaginationOptions,
emitRawOrderByPayload,
maxConcurrency,
enumeratorsAndTokens,
continuationToken == null ? null : new QueryState(continuationToken));
return TryCatch<IQueryPipelineStage>.FromResult(stage);
}