static void create_two_stage_paths()

in src/backend/cdb/cdbgroupingpaths.c [163:534]


static void create_two_stage_paths(PlannerInfo *root, cdb_agg_planning_context *ctx,
								   RelOptInfo *input_rel, RelOptInfo *output_rel, List *partial_pathlist);

static List *get_all_rollup_groupclauses(List *rollups);

static Index add_gsetid_tlist(List *tlist);

static SortGroupClause *create_gsetid_groupclause(Index groupref);
static List *strip_gsetid_from_pathkeys(Index gsetid_sortref, List *pathkeys);

static void add_first_stage_group_agg_path(PlannerInfo *root,
										   Path *path,
										   bool is_sorted,
										   cdb_agg_planning_context *ctx);
static void add_first_stage_hash_agg_path(PlannerInfo *root,
										  Path *path,
										  cdb_agg_planning_context *ctx);
static void add_second_stage_group_agg_path(PlannerInfo *root,
											Path *path,
											bool is_sorted,
											cdb_agg_planning_context *ctx,
											RelOptInfo *output_rel,
											bool is_partial);
static void add_second_stage_hash_agg_path(PlannerInfo *root,
										   Path *path,
										   cdb_agg_planning_context *ctx,
										   RelOptInfo *output_rel,
										   bool is_partial);
static void add_single_dqa_hash_agg_path(PlannerInfo *root,
										 Path *path,
										 cdb_agg_planning_context *ctx,
										 RelOptInfo *output_rel,
										 PathTarget *input_target,
										 List       *dqa_group_clause,
										 double dNumDistinctGroups);
static void add_single_mixed_dqa_hash_agg_path(PlannerInfo *root,
                                               Path *path,
                                               cdb_agg_planning_context *ctx,
                                               RelOptInfo *output_rel,
                                               PathTarget *input_target,
                                               List       *dqa_group_clause);
static void
add_multi_dqas_hash_agg_path(PlannerInfo *root,
							 Path *path,
							 cdb_agg_planning_context *ctx,
							 RelOptInfo *output_rel,
							 cdb_multi_dqas_info *info);

static void
fetch_single_dqa_info(PlannerInfo *root,
					  Path *path,
					  cdb_agg_planning_context *ctx,
					  cdb_multi_dqas_info *info);

static void
fetch_multi_dqas_info(PlannerInfo *root,
					  Path *path,
					  cdb_agg_planning_context *ctx,
					  cdb_multi_dqas_info *info);

static DQAType
recognize_dqa_type(cdb_agg_planning_context *ctx);

static PathTarget *
strip_aggdistinct(PathTarget *target);

/*
 * cdb_create_multistage_grouping_paths
 *
 * This is basically an extension of the function create_grouping_paths() from
 * planner.c.  It creates two- and three-stage Paths to implement aggregates
 * and/or GROUP BY.
 *
 * The caller already constructed Paths for one-stage plans, we are only
 * concerned about more complicated multi-stage plans here.
 */
void
cdb_create_multistage_grouping_paths(PlannerInfo *root,
								   RelOptInfo *input_rel,
								   RelOptInfo *output_rel,
								   PathTarget *target,
								   PathTarget *partial_grouping_target,
								   List *havingQual,
								   double dNumGroupsTotal,
								   const AggClauseCosts *agg_costs,
								   const AggClauseCosts *agg_partial_costs,
								   const AggClauseCosts *agg_final_costs,
								   List *rollups,
								   List *new_rollups,
								   AggStrategy strat,
								   List *partial_pathlist)
{
	Query	   *parse = root->parse;
	Path	   *cheapest_path = input_rel->cheapest_total_path;
	bool		has_ordered_aggs = root->numPureOrderedAggs > 0;
	cdb_agg_planning_context ctx;
	bool		can_sort;
	bool		can_hash;

	/* The caller should've checked these already */
	Assert(parse->hasAggs || parse->groupClause);

	/*
	 * This prohibition could be relaxed if we tracked missing combine
	 * functions per DQA and were willing to plan some DQAs as single and
	 * some as multiple phases.  Not currently, however.
	 */
	Assert(!root->hasNonCombine && !root->hasNonSerialAggs);
	Assert(root->config->gp_enable_multiphase_agg);

	/*
	 * Ordered aggregates need to run the transition function on the
	 * values in sorted order, which in turn translates into single phase
	 * aggregation.
	 */
	if (has_ordered_aggs)
		return;

	/*
	 * We are currently unwilling to redistribute a gathered intermediate
	 * across the cluster.  This might change one day.
	 */
	if (!CdbPathLocus_IsPartitioned(cheapest_path->locus))
		return;

	/*
	 * Is the input hashable / sortable? This is largely the same logic as in
	 * upstream create_grouping_paths(), but we can do hashing in limited ways
	 * even if there are DISTINCT aggs or grouping sets.
	 */
	can_sort = grouping_is_sortable(parse->groupClause);
	can_hash = (parse->groupClause != NIL &&
				root->numPureOrderedAggs == 0 &&
				grouping_is_hashable(parse->groupClause));

	/*
	 * Prepare a struct to hold the arguments and intermediate results
	 * across subroutines.
	 */
	memset(&ctx, 0, sizeof(ctx));
	ctx.can_sort = can_sort;
	ctx.can_hash = can_hash;
	ctx.target = target;
	ctx.dNumGroupsTotal = dNumGroupsTotal;
	ctx.agg_costs = agg_costs;
	ctx.agg_partial_costs = agg_partial_costs;
	ctx.agg_final_costs = agg_final_costs;
	ctx.rollups = rollups;
	ctx.new_rollups = new_rollups;
	ctx.strat = strat;

	ctx.hasAggs = parse->hasAggs;
	ctx.groupClause = parse->groupClause;
	ctx.groupingSets = parse->groupingSets;
	ctx.havingQual = havingQual;
	ctx.partial_rel = fetch_upper_rel(root, UPPERREL_CDB_FIRST_STAGE_GROUP_AGG, NULL);
	ctx.partial_rel->fdwroutine = input_rel->fdwroutine;
	ctx.partial_rel->serverid = input_rel->serverid;
	ctx.partial_rel->segSeverids = input_rel->segSeverids;
	ctx.partial_rel->userid = input_rel->userid;
	ctx.partial_rel->exec_location = input_rel->exec_location;
	ctx.partial_rel->num_segments = input_rel->num_segments;

	/* create a partial rel similar to make_grouping_rel() */
	if (IS_OTHER_REL(input_rel))
	{
		ctx.partial_rel = fetch_upper_rel(root, UPPERREL_CDB_FIRST_STAGE_GROUP_AGG,
										  input_rel->relids);
		ctx.partial_rel->reloptkind = RELOPT_OTHER_UPPER_REL;
	}
	else
	{
		ctx.partial_rel = fetch_upper_rel(root, UPPERREL_CDB_FIRST_STAGE_GROUP_AGG,
										  NULL);
	}
	ctx.partial_needed_pathkeys = root->group_pathkeys;
	ctx.partial_sort_pathkeys = root->group_pathkeys;
	/*
	 * CBDB parallel: Set consider_parallel for costs comparison.
	 * Else 2-stage agg with lower costs may lose to 1-stage agg.
	 */
	ctx.partial_rel->consider_parallel = output_rel->consider_parallel;

	ctx.group_tles = get_common_group_tles(target,
										   parse->groupClause,
										   ctx.rollups);

	/*
	 * For twostage grouping sets, we perform grouping sets aggregation in
	 * partial stage and normal aggregation in final stage.
	 *
	 * With this method, there is a problem, i.e., in the final stage of
	 * aggregation, we don't have a way to distinguish which tuple comes from
	 * which grouping set, which is needed for merging the partial results.
	 *
	 * For instance, suppose we have a table t(c1, c2, c3) containing one row
	 * (1, NULL, 3), and we are selecting agg(c3) group by grouping sets
	 * ((c1,c2), (c1)). Then there would be two tuples as partial results for
	 * that row, both are (1, NULL, agg(3)), one is from group by (c1,c2) and
	 * one is from group by (c1). If we cannot tell that the two tuples are
	 * from two different grouping sets, we will merge them incorrectly.
	 *
	 * So we add a hidden column 'GROUPINGSET_ID', representing grouping set
	 * id, to the targetlist of Partial Aggregate node, as well as to the sort
	 * keys and group keys for Finalize Aggregate node. So only tuples coming
	 * from the same grouping set can get merged in the final stage of
	 * aggregation. Note that we need to keep 'GROUPINGSET_ID' at the head of
	 * sort keys in final stage to ensure correctness.
	 *
	 * Below is a plan to illustrate this idea:
	 *
	 * # explain (costs off, verbose)
	 * select c1, c2, c3, avg(c3) from gstest group by grouping sets((c1,c2),(c1),(c2,c3));
	 *                                 QUERY PLAN
	 * ---------------------------------------------------------------------------
	 *  Finalize GroupAggregate
	 *    Output: c1, c2, c3, avg(c3)
	 *    Group Key: (GROUPINGSET_ID()), gstest.c1, gstest.c2, gstest.c3
	 *    ->  Sort
	 *          Output: c1, c2, c3, (PARTIAL avg(c3)), (GROUPINGSET_ID())
	 *          Sort Key: (GROUPINGSET_ID()), gstest.c1, gstest.c2, gstest.c3
	 *          ->  Gather Motion 3:1  (slice1; segments: 3)
	 *                Output: c1, c2, c3, (PARTIAL avg(c3)), (GROUPINGSET_ID())
	 *                ->  Partial GroupAggregate
	 *                      Output: c1, c2, c3, PARTIAL avg(c3), GROUPINGSET_ID()
	 *                      Group Key: gstest.c1, gstest.c2
	 *                      Group Key: gstest.c1
	 *                      Sort Key: gstest.c2, gstest.c3
	 *                        Group Key: gstest.c2, gstest.c3
	 *                      ->  Sort
	 *                            Output: c1, c2, c3
	 *                            Sort Key: gstest.c1, gstest.c2
	 *                            ->  Seq Scan on public.gstest
	 *                                  Output: c1, c2, c3
	 *  Optimizer: Postgres query optimizer
	 * (20 rows)
	 *
	 * Here, we prepare a target list and a corresponding list of SortGroupClauses
	 * for the result of the Partial Aggregate stage.
	 */
	if (parse->groupingSets)
	{
		GroupingSetId *gsetid;
		List	   *grouping_sets_tlist;
		SortGroupClause *gsetcl;
		List	   *gcls;
		List	   *tlist;

		gsetid = makeNode(GroupingSetId);
		grouping_sets_tlist = copyObject(root->processed_tlist);
		ctx.gsetid_sortref = add_gsetid_tlist(grouping_sets_tlist);

		gsetcl = create_gsetid_groupclause(ctx.gsetid_sortref);

		ctx.final_groupClause = lappend(copyObject(parse->groupClause), gsetcl);

		ctx.partial_grouping_target = copyObject(partial_grouping_target);
		if (!list_member(ctx.partial_grouping_target->exprs, gsetid))
			add_column_to_pathtarget(ctx.partial_grouping_target,
									 (Expr *) gsetid, ctx.gsetid_sortref);

		gcls = get_all_rollup_groupclauses(rollups);
		gcls = lappend(gcls, gsetcl);
		tlist = make_tlist_from_pathtarget(ctx.partial_grouping_target);

		/*
		 * The input to the final stage will be sorted by this. It includes the
		 * GROUPINGSET_ID() column.
		 */
		ctx.final_needed_pathkeys = make_pathkeys_for_sortclauses(root, gcls, tlist);
	}
	else
	{
		ctx.partial_grouping_target = partial_grouping_target;
		ctx.final_groupClause = parse->groupClause;
		ctx.final_needed_pathkeys = root->group_pathkeys;
		ctx.gsetid_sortref = 0;
	}
	ctx.final_sort_pathkeys = ctx.final_needed_pathkeys;
	ctx.final_group_tles = get_common_group_tles(ctx.partial_grouping_target,
												 ctx.final_groupClause,
												 NIL);
	ctx.partial_rel->reltarget = ctx.partial_grouping_target;

	/*
	 * All set, generate the two-stage paths.
	 */
	create_two_stage_paths(root, &ctx, input_rel, output_rel, partial_pathlist);

	/*
	 * Aggregates with DISTINCT arguments are more complicated, and are not
	 * handled by create_two_stage_paths() (except for the case of a single
	 * DQA that happens to be collocated with the input, see
	 * add_first_stage_group_agg_path()). Consider ways to implement them,
	 * too.
	 */
	if ((can_hash || parse->groupClause == NIL) &&
		!parse->groupingSets &&
		list_length(agg_costs->distinctAggrefs) > 0)
	{
		/*
		 * Try possible plans for DISTINCT-qualified aggregate.
		 */
		cdb_multi_dqas_info info = {};
		DQAType type = recognize_dqa_type(&ctx);
		switch (type)
		{
		case SINGLE_DQA:
			{
				fetch_single_dqa_info(root, cheapest_path, &ctx, &info);

				add_single_dqa_hash_agg_path(root,
											 cheapest_path,
											 &ctx,
											 output_rel,
											 info.input_proj_target,
											 info.dqa_group_clause,
											 info.dNumDistinctGroups);
			}
			break;
		case SINGLE_DQA_WITHAGG:
			{
				fetch_single_dqa_info(root, cheapest_path, &ctx, &info);

				add_single_mixed_dqa_hash_agg_path(root,
				                                   cheapest_path,
				                                   &ctx,
				                                   output_rel,
				                                   info.input_proj_target,
				                                   info.dqa_group_clause);
			}
			break;
		case MULTI_DQAS:
			{
				fetch_multi_dqas_info(root, cheapest_path, &ctx, &info);
				/*
				 * GPDB_14_MERGE_FIXME: We have done some copy job in
				 * make_partial_grouping_target, so that the agg references
				 * in plan is actually different from
				 * agg_partial_costs->distinctAggrefs. And it has to be
				 * different since we need to compute and set agg_expr_id for
				 * tuple split cases.
				 * However, we need to push multi dqa's filter to tuplesplit
				 * to get the correct result. And thus we need to remove the
				 * filter in aggref referenced by the plan.
				 *
				 * It's not that trivial to fix it perfectly. By manually
				 * removing the origin plan's aggfilter can work around
				 * this problem. We'll look at it again later.
				 */
				ListCell   *lc;
				foreach(lc, root->agginfos)
				{
					AggInfo    *agginfo = (AggInfo *) lfirst(lc);
					Aggref	   *aggref = agginfo->representative_aggref;
					if (aggref->aggdistinct != NIL)
						aggref->aggfilter = NULL;
				}
				add_multi_dqas_hash_agg_path(root,
											 cheapest_path,
											 &ctx,
											 output_rel,
											 &info);
			}
			break;
		case MULTI_DQAS_WITHAGG:
			break;
		default:
			break;
		}
	}
}