in src/backend/cdb/cdbgroupingpaths.c [163:534]
static void create_two_stage_paths(PlannerInfo *root, cdb_agg_planning_context *ctx,
RelOptInfo *input_rel, RelOptInfo *output_rel, List *partial_pathlist);
static List *get_all_rollup_groupclauses(List *rollups);
static Index add_gsetid_tlist(List *tlist);
static SortGroupClause *create_gsetid_groupclause(Index groupref);
static List *strip_gsetid_from_pathkeys(Index gsetid_sortref, List *pathkeys);
static void add_first_stage_group_agg_path(PlannerInfo *root,
Path *path,
bool is_sorted,
cdb_agg_planning_context *ctx);
static void add_first_stage_hash_agg_path(PlannerInfo *root,
Path *path,
cdb_agg_planning_context *ctx);
static void add_second_stage_group_agg_path(PlannerInfo *root,
Path *path,
bool is_sorted,
cdb_agg_planning_context *ctx,
RelOptInfo *output_rel,
bool is_partial);
static void add_second_stage_hash_agg_path(PlannerInfo *root,
Path *path,
cdb_agg_planning_context *ctx,
RelOptInfo *output_rel,
bool is_partial);
static void add_single_dqa_hash_agg_path(PlannerInfo *root,
Path *path,
cdb_agg_planning_context *ctx,
RelOptInfo *output_rel,
PathTarget *input_target,
List *dqa_group_clause,
double dNumDistinctGroups);
static void add_single_mixed_dqa_hash_agg_path(PlannerInfo *root,
Path *path,
cdb_agg_planning_context *ctx,
RelOptInfo *output_rel,
PathTarget *input_target,
List *dqa_group_clause);
static void
add_multi_dqas_hash_agg_path(PlannerInfo *root,
Path *path,
cdb_agg_planning_context *ctx,
RelOptInfo *output_rel,
cdb_multi_dqas_info *info);
static void
fetch_single_dqa_info(PlannerInfo *root,
Path *path,
cdb_agg_planning_context *ctx,
cdb_multi_dqas_info *info);
static void
fetch_multi_dqas_info(PlannerInfo *root,
Path *path,
cdb_agg_planning_context *ctx,
cdb_multi_dqas_info *info);
static DQAType
recognize_dqa_type(cdb_agg_planning_context *ctx);
static PathTarget *
strip_aggdistinct(PathTarget *target);
/*
* cdb_create_multistage_grouping_paths
*
* This is basically an extension of the function create_grouping_paths() from
* planner.c. It creates two- and three-stage Paths to implement aggregates
* and/or GROUP BY.
*
* The caller already constructed Paths for one-stage plans, we are only
* concerned about more complicated multi-stage plans here.
*/
void
cdb_create_multistage_grouping_paths(PlannerInfo *root,
RelOptInfo *input_rel,
RelOptInfo *output_rel,
PathTarget *target,
PathTarget *partial_grouping_target,
List *havingQual,
double dNumGroupsTotal,
const AggClauseCosts *agg_costs,
const AggClauseCosts *agg_partial_costs,
const AggClauseCosts *agg_final_costs,
List *rollups,
List *new_rollups,
AggStrategy strat,
List *partial_pathlist)
{
Query *parse = root->parse;
Path *cheapest_path = input_rel->cheapest_total_path;
bool has_ordered_aggs = root->numPureOrderedAggs > 0;
cdb_agg_planning_context ctx;
bool can_sort;
bool can_hash;
/* The caller should've checked these already */
Assert(parse->hasAggs || parse->groupClause);
/*
* This prohibition could be relaxed if we tracked missing combine
* functions per DQA and were willing to plan some DQAs as single and
* some as multiple phases. Not currently, however.
*/
Assert(!root->hasNonCombine && !root->hasNonSerialAggs);
Assert(root->config->gp_enable_multiphase_agg);
/*
* Ordered aggregates need to run the transition function on the
* values in sorted order, which in turn translates into single phase
* aggregation.
*/
if (has_ordered_aggs)
return;
/*
* We are currently unwilling to redistribute a gathered intermediate
* across the cluster. This might change one day.
*/
if (!CdbPathLocus_IsPartitioned(cheapest_path->locus))
return;
/*
* Is the input hashable / sortable? This is largely the same logic as in
* upstream create_grouping_paths(), but we can do hashing in limited ways
* even if there are DISTINCT aggs or grouping sets.
*/
can_sort = grouping_is_sortable(parse->groupClause);
can_hash = (parse->groupClause != NIL &&
root->numPureOrderedAggs == 0 &&
grouping_is_hashable(parse->groupClause));
/*
* Prepare a struct to hold the arguments and intermediate results
* across subroutines.
*/
memset(&ctx, 0, sizeof(ctx));
ctx.can_sort = can_sort;
ctx.can_hash = can_hash;
ctx.target = target;
ctx.dNumGroupsTotal = dNumGroupsTotal;
ctx.agg_costs = agg_costs;
ctx.agg_partial_costs = agg_partial_costs;
ctx.agg_final_costs = agg_final_costs;
ctx.rollups = rollups;
ctx.new_rollups = new_rollups;
ctx.strat = strat;
ctx.hasAggs = parse->hasAggs;
ctx.groupClause = parse->groupClause;
ctx.groupingSets = parse->groupingSets;
ctx.havingQual = havingQual;
ctx.partial_rel = fetch_upper_rel(root, UPPERREL_CDB_FIRST_STAGE_GROUP_AGG, NULL);
ctx.partial_rel->fdwroutine = input_rel->fdwroutine;
ctx.partial_rel->serverid = input_rel->serverid;
ctx.partial_rel->segSeverids = input_rel->segSeverids;
ctx.partial_rel->userid = input_rel->userid;
ctx.partial_rel->exec_location = input_rel->exec_location;
ctx.partial_rel->num_segments = input_rel->num_segments;
/* create a partial rel similar to make_grouping_rel() */
if (IS_OTHER_REL(input_rel))
{
ctx.partial_rel = fetch_upper_rel(root, UPPERREL_CDB_FIRST_STAGE_GROUP_AGG,
input_rel->relids);
ctx.partial_rel->reloptkind = RELOPT_OTHER_UPPER_REL;
}
else
{
ctx.partial_rel = fetch_upper_rel(root, UPPERREL_CDB_FIRST_STAGE_GROUP_AGG,
NULL);
}
ctx.partial_needed_pathkeys = root->group_pathkeys;
ctx.partial_sort_pathkeys = root->group_pathkeys;
/*
* CBDB parallel: Set consider_parallel for costs comparison.
* Else 2-stage agg with lower costs may lose to 1-stage agg.
*/
ctx.partial_rel->consider_parallel = output_rel->consider_parallel;
ctx.group_tles = get_common_group_tles(target,
parse->groupClause,
ctx.rollups);
/*
* For twostage grouping sets, we perform grouping sets aggregation in
* partial stage and normal aggregation in final stage.
*
* With this method, there is a problem, i.e., in the final stage of
* aggregation, we don't have a way to distinguish which tuple comes from
* which grouping set, which is needed for merging the partial results.
*
* For instance, suppose we have a table t(c1, c2, c3) containing one row
* (1, NULL, 3), and we are selecting agg(c3) group by grouping sets
* ((c1,c2), (c1)). Then there would be two tuples as partial results for
* that row, both are (1, NULL, agg(3)), one is from group by (c1,c2) and
* one is from group by (c1). If we cannot tell that the two tuples are
* from two different grouping sets, we will merge them incorrectly.
*
* So we add a hidden column 'GROUPINGSET_ID', representing grouping set
* id, to the targetlist of Partial Aggregate node, as well as to the sort
* keys and group keys for Finalize Aggregate node. So only tuples coming
* from the same grouping set can get merged in the final stage of
* aggregation. Note that we need to keep 'GROUPINGSET_ID' at the head of
* sort keys in final stage to ensure correctness.
*
* Below is a plan to illustrate this idea:
*
* # explain (costs off, verbose)
* select c1, c2, c3, avg(c3) from gstest group by grouping sets((c1,c2),(c1),(c2,c3));
* QUERY PLAN
* ---------------------------------------------------------------------------
* Finalize GroupAggregate
* Output: c1, c2, c3, avg(c3)
* Group Key: (GROUPINGSET_ID()), gstest.c1, gstest.c2, gstest.c3
* -> Sort
* Output: c1, c2, c3, (PARTIAL avg(c3)), (GROUPINGSET_ID())
* Sort Key: (GROUPINGSET_ID()), gstest.c1, gstest.c2, gstest.c3
* -> Gather Motion 3:1 (slice1; segments: 3)
* Output: c1, c2, c3, (PARTIAL avg(c3)), (GROUPINGSET_ID())
* -> Partial GroupAggregate
* Output: c1, c2, c3, PARTIAL avg(c3), GROUPINGSET_ID()
* Group Key: gstest.c1, gstest.c2
* Group Key: gstest.c1
* Sort Key: gstest.c2, gstest.c3
* Group Key: gstest.c2, gstest.c3
* -> Sort
* Output: c1, c2, c3
* Sort Key: gstest.c1, gstest.c2
* -> Seq Scan on public.gstest
* Output: c1, c2, c3
* Optimizer: Postgres query optimizer
* (20 rows)
*
* Here, we prepare a target list and a corresponding list of SortGroupClauses
* for the result of the Partial Aggregate stage.
*/
if (parse->groupingSets)
{
GroupingSetId *gsetid;
List *grouping_sets_tlist;
SortGroupClause *gsetcl;
List *gcls;
List *tlist;
gsetid = makeNode(GroupingSetId);
grouping_sets_tlist = copyObject(root->processed_tlist);
ctx.gsetid_sortref = add_gsetid_tlist(grouping_sets_tlist);
gsetcl = create_gsetid_groupclause(ctx.gsetid_sortref);
ctx.final_groupClause = lappend(copyObject(parse->groupClause), gsetcl);
ctx.partial_grouping_target = copyObject(partial_grouping_target);
if (!list_member(ctx.partial_grouping_target->exprs, gsetid))
add_column_to_pathtarget(ctx.partial_grouping_target,
(Expr *) gsetid, ctx.gsetid_sortref);
gcls = get_all_rollup_groupclauses(rollups);
gcls = lappend(gcls, gsetcl);
tlist = make_tlist_from_pathtarget(ctx.partial_grouping_target);
/*
* The input to the final stage will be sorted by this. It includes the
* GROUPINGSET_ID() column.
*/
ctx.final_needed_pathkeys = make_pathkeys_for_sortclauses(root, gcls, tlist);
}
else
{
ctx.partial_grouping_target = partial_grouping_target;
ctx.final_groupClause = parse->groupClause;
ctx.final_needed_pathkeys = root->group_pathkeys;
ctx.gsetid_sortref = 0;
}
ctx.final_sort_pathkeys = ctx.final_needed_pathkeys;
ctx.final_group_tles = get_common_group_tles(ctx.partial_grouping_target,
ctx.final_groupClause,
NIL);
ctx.partial_rel->reltarget = ctx.partial_grouping_target;
/*
* All set, generate the two-stage paths.
*/
create_two_stage_paths(root, &ctx, input_rel, output_rel, partial_pathlist);
/*
* Aggregates with DISTINCT arguments are more complicated, and are not
* handled by create_two_stage_paths() (except for the case of a single
* DQA that happens to be collocated with the input, see
* add_first_stage_group_agg_path()). Consider ways to implement them,
* too.
*/
if ((can_hash || parse->groupClause == NIL) &&
!parse->groupingSets &&
list_length(agg_costs->distinctAggrefs) > 0)
{
/*
* Try possible plans for DISTINCT-qualified aggregate.
*/
cdb_multi_dqas_info info = {};
DQAType type = recognize_dqa_type(&ctx);
switch (type)
{
case SINGLE_DQA:
{
fetch_single_dqa_info(root, cheapest_path, &ctx, &info);
add_single_dqa_hash_agg_path(root,
cheapest_path,
&ctx,
output_rel,
info.input_proj_target,
info.dqa_group_clause,
info.dNumDistinctGroups);
}
break;
case SINGLE_DQA_WITHAGG:
{
fetch_single_dqa_info(root, cheapest_path, &ctx, &info);
add_single_mixed_dqa_hash_agg_path(root,
cheapest_path,
&ctx,
output_rel,
info.input_proj_target,
info.dqa_group_clause);
}
break;
case MULTI_DQAS:
{
fetch_multi_dqas_info(root, cheapest_path, &ctx, &info);
/*
* GPDB_14_MERGE_FIXME: We have done some copy job in
* make_partial_grouping_target, so that the agg references
* in plan is actually different from
* agg_partial_costs->distinctAggrefs. And it has to be
* different since we need to compute and set agg_expr_id for
* tuple split cases.
* However, we need to push multi dqa's filter to tuplesplit
* to get the correct result. And thus we need to remove the
* filter in aggref referenced by the plan.
*
* It's not that trivial to fix it perfectly. By manually
* removing the origin plan's aggfilter can work around
* this problem. We'll look at it again later.
*/
ListCell *lc;
foreach(lc, root->agginfos)
{
AggInfo *agginfo = (AggInfo *) lfirst(lc);
Aggref *aggref = agginfo->representative_aggref;
if (aggref->aggdistinct != NIL)
aggref->aggfilter = NULL;
}
add_multi_dqas_hash_agg_path(root,
cheapest_path,
&ctx,
output_rel,
&info);
}
break;
case MULTI_DQAS_WITHAGG:
break;
default:
break;
}
}
}