in Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/PipelineFactory.cs [143:321]
public static TryCatch<IQueryPipelineStage> MonadicCreate(
IDocumentContainer documentContainer,
SqlQuerySpec sqlQuerySpec,
IReadOnlyList<FeedRangeEpk> targetRanges,
PartitionKey? partitionKey,
QueryInfo queryInfo,
PrefetchPolicy prefetchPolicy,
ContainerQueryProperties containerQueryProperties,
int maxItemCount,
bool emitRawOrderByPayload,
bool isContinuationExpected,
int maxConcurrency,
CosmosElement requestContinuationToken)
{
// We need to compute the optimal initial page size for order-by queries
long optimalPageSize = maxItemCount;
if (queryInfo.HasOrderBy)
{
uint top;
if (queryInfo.HasTop && (queryInfo.Top.Value > 0))
{
top = queryInfo.Top.Value;
}
else if (queryInfo.HasLimit && (queryInfo.Limit.Value > 0))
{
top = (queryInfo.Offset ?? 0) + queryInfo.Limit.Value;
}
else
{
top = 0;
}
if (top > int.MaxValue)
{
throw new ArgumentOutOfRangeException(nameof(queryInfo.Top.Value));
}
if (top > 0)
{
// All partitions should initially fetch about 1/nth of the top value.
long pageSizeWithTop = (long)Math.Min(
Math.Ceiling(top / (double)targetRanges.Count) * PageSizeFactorForTop,
top);
optimalPageSize = Math.Min(pageSizeWithTop, optimalPageSize);
}
else if (isContinuationExpected)
{
optimalPageSize = (long)Math.Min(
Math.Ceiling(optimalPageSize / (double)targetRanges.Count) * PageSizeFactorForTop,
optimalPageSize);
}
}
QueryExecutionOptions queryPaginationOptions = new QueryExecutionOptions(pageSizeHint: (int)optimalPageSize);
Debug.Assert(
(optimalPageSize > 0) && (optimalPageSize <= int.MaxValue),
$"Invalid MaxItemCount {optimalPageSize}");
sqlQuerySpec = !string.IsNullOrEmpty(queryInfo.RewrittenQuery) ? new SqlQuerySpec(queryInfo.RewrittenQuery, sqlQuerySpec.Parameters) : sqlQuerySpec;
MonadicCreatePipelineStage monadicCreatePipelineStage;
if (queryInfo.HasOrderBy)
{
monadicCreatePipelineStage = (continuationToken) => OrderByCrossPartitionQueryPipelineStage.MonadicCreate(
documentContainer: documentContainer,
sqlQuerySpec: sqlQuerySpec,
targetRanges: targetRanges,
partitionKey: partitionKey,
orderByColumns: queryInfo
.OrderByExpressions
.Zip(queryInfo.OrderBy, (expression, sortOrder) => new OrderByColumn(expression, sortOrder)).ToList(),
queryPaginationOptions: queryPaginationOptions,
maxConcurrency: maxConcurrency,
nonStreamingOrderBy: queryInfo.HasNonStreamingOrderBy,
emitRawOrderByPayload: emitRawOrderByPayload,
continuationToken: continuationToken,
containerQueryProperties: containerQueryProperties);
}
else
{
monadicCreatePipelineStage = (continuationToken) => ParallelCrossPartitionQueryPipelineStage.MonadicCreate(
documentContainer: documentContainer,
sqlQuerySpec: sqlQuerySpec,
targetRanges: targetRanges,
queryPaginationOptions: queryPaginationOptions,
partitionKey: partitionKey,
containerQueryProperties: containerQueryProperties,
prefetchPolicy: prefetchPolicy,
maxConcurrency: maxConcurrency,
continuationToken: continuationToken);
}
if (queryInfo.HasAggregates && !queryInfo.HasGroupBy)
{
MonadicCreatePipelineStage monadicCreateSourceStage = monadicCreatePipelineStage;
monadicCreatePipelineStage = (continuationToken) => AggregateQueryPipelineStage.MonadicCreate(
queryInfo.Aggregates,
queryInfo.GroupByAliasToAggregateType,
queryInfo.GroupByAliases,
queryInfo.HasSelectValue,
continuationToken,
monadicCreateSourceStage);
}
if (queryInfo.HasDistinct)
{
MonadicCreatePipelineStage monadicCreateSourceStage = monadicCreatePipelineStage;
monadicCreatePipelineStage = (continuationToken) => DistinctQueryPipelineStage.MonadicCreate(
continuationToken,
monadicCreateSourceStage,
queryInfo.DistinctType);
}
if (queryInfo.HasGroupBy)
{
MonadicCreatePipelineStage monadicCreateSourceStage = monadicCreatePipelineStage;
monadicCreatePipelineStage = (continuationToken) => GroupByQueryPipelineStage.MonadicCreate(
continuationToken,
monadicCreateSourceStage,
queryInfo.Aggregates,
queryInfo.GroupByAliasToAggregateType,
queryInfo.GroupByAliases,
queryInfo.HasSelectValue,
(queryPaginationOptions ?? QueryExecutionOptions.Default).PageSizeLimit.GetValueOrDefault(int.MaxValue));
}
if (queryInfo.HasOffset)
{
Debug.Assert(queryInfo.Offset.Value <= int.MaxValue, "PipelineFactory Assert!", "Offset value must be <= int.MaxValue");
int offsetCount = (int)queryInfo.Offset.Value;
MonadicCreatePipelineStage monadicCreateSourceStage = monadicCreatePipelineStage;
monadicCreatePipelineStage = (continuationToken) => SkipQueryPipelineStage.MonadicCreate(
offsetCount,
continuationToken,
monadicCreateSourceStage);
}
if (queryInfo.HasLimit)
{
Debug.Assert(queryInfo.Limit.Value <= int.MaxValue, "PipelineFactory Assert!", "Limit value must be <= int.MaxValue");
int limitCount = (int)queryInfo.Limit.Value;
MonadicCreatePipelineStage monadicCreateSourceStage = monadicCreatePipelineStage;
monadicCreatePipelineStage = (continuationToken) => TakeQueryPipelineStage.MonadicCreateLimitStage(
limitCount,
continuationToken,
monadicCreateSourceStage);
}
if (queryInfo.HasTop)
{
Debug.Assert(queryInfo.Top.Value <= int.MaxValue, "PipelineFactory Assert!", "Top value must be <= int.MaxValue");
int topCount = (int)queryInfo.Top.Value;
MonadicCreatePipelineStage monadicCreateSourceStage = monadicCreatePipelineStage;
monadicCreatePipelineStage = (continuationToken) => TakeQueryPipelineStage.MonadicCreateTopStage(
topCount,
continuationToken,
monadicCreateSourceStage);
}
if (queryInfo.HasDCount)
{
MonadicCreatePipelineStage monadicCreateSourceStage = monadicCreatePipelineStage;
monadicCreatePipelineStage = (continuationToken) => DCountQueryPipelineStage.MonadicCreate(
queryInfo.DCountInfo,
continuationToken,
monadicCreateSourceStage);
}
return monadicCreatePipelineStage(requestContinuationToken)
.Try<IQueryPipelineStage>(onSuccess: stage => new SkipEmptyPageQueryPipelineStage(stage));
}