in datafusion/core/src/physical_plan/aggregates/mod.rs [463:568]
fn calc_required_input_ordering(
input: &Arc<dyn ExecutionPlan>,
aggr_exprs: &mut [Arc<dyn AggregateExpr>],
order_by_exprs: &mut [Option<LexOrdering>],
aggregator_reqs: LexOrderingReq,
aggregator_reverse_reqs: Option<LexOrderingReq>,
aggregation_ordering: &mut Option<AggregationOrdering>,
mode: &AggregateMode,
) -> Result<Option<LexOrderingReq>> {
let mut required_input_ordering = vec![];
// Boolean shows that whether `required_input_ordering` stored comes from
// `aggregator_reqs` or `aggregator_reverse_reqs`
let mut reverse_req = false;
// If reverse aggregator is None, there is no way to run aggregators in reverse mode. Hence ignore it during analysis
let aggregator_requirements =
if let Some(aggregator_reverse_reqs) = aggregator_reverse_reqs {
// If existing ordering doesn't satisfy requirement, we should do calculations
// on naive requirement (by convention, otherwise the final plan will be unintuitive),
// even if reverse ordering is possible.
// Hence, while iterating consider naive requirement last, by this way
// we prioritize naive requirement over reverse requirement, when
// reverse requirement is not helpful with removing SortExec from the plan.
vec![(true, aggregator_reverse_reqs), (false, aggregator_reqs)]
} else {
vec![(false, aggregator_reqs)]
};
for (is_reverse, aggregator_requirement) in aggregator_requirements.into_iter() {
if let Some(AggregationOrdering {
// If the mode is FullyOrdered or PartiallyOrdered (i.e. we are
// running with bounded memory, without breaking the pipeline),
// then we append the aggregator ordering requirement to the existing
// ordering. This way, we can still run with bounded memory.
mode: GroupByOrderMode::FullyOrdered | GroupByOrderMode::PartiallyOrdered,
order_indices,
..
}) = aggregation_ordering
{
// Get the section of the input ordering that enables us to run in
// FullyOrdered or PartiallyOrdered modes:
let requirement_prefix =
if let Some(existing_ordering) = input.output_ordering() {
&existing_ordering[0..order_indices.len()]
} else {
&[]
};
let mut requirement =
PhysicalSortRequirement::from_sort_exprs(requirement_prefix.iter());
for req in aggregator_requirement {
// Final and FinalPartitioned modes don't enforce ordering
// requirements since order-sensitive aggregators handle such
// requirements during merging.
if mode.is_first_stage()
&& requirement.iter().all(|item| req.expr.ne(&item.expr))
{
requirement.push(req);
}
}
required_input_ordering = requirement;
} else if mode.is_first_stage() {
required_input_ordering = aggregator_requirement;
}
// Keep track of the direction from which required_input_ordering is constructed:
reverse_req = is_reverse;
// If all the order-sensitive aggregate functions are reversible (e.g. all the
// order-sensitive aggregators are either FIRST_VALUE or LAST_VALUE), then we can
// run aggregate expressions either in the given required ordering, (i.e. finest
// requirement that satisfies every aggregate function requirement) or its reverse
// (opposite) direction. We analyze these two possibilities, and use the version that
// satisfies existing ordering. This enables us to avoid an extra sort step in the final
// plan. If neither version satisfies the existing ordering, we use the given ordering
// requirement. In short, if running aggregators in reverse order help us to avoid a
// sorting step, we do so. Otherwise, we use the aggregators as is.
let existing_ordering = input.output_ordering().unwrap_or(&[]);
if ordering_satisfy_requirement_concrete(
existing_ordering,
&required_input_ordering,
|| input.equivalence_properties(),
|| input.ordering_equivalence_properties(),
) {
break;
}
}
// If `required_input_ordering` is constructed using the reverse requirement, we
// should reverse each `aggr_expr` in order to correctly calculate their results
// in reverse order.
if reverse_req {
aggr_exprs
.iter_mut()
.zip(order_by_exprs.iter_mut())
.map(|(aggr_expr, ob_expr)| {
if is_order_sensitive(aggr_expr) {
if let Some(reverse) = aggr_expr.reverse_expr() {
*aggr_expr = reverse;
*ob_expr = ob_expr.as_ref().map(|obs| reverse_order_bys(obs));
} else {
return plan_err!(
"Aggregate expression should have a reverse expression"
);
}
}
Ok(())
})
.collect::<Result<Vec<_>>>()?;
}
Ok((!required_input_ordering.is_empty()).then_some(required_input_ordering))
}