fn calc_required_input_ordering()

in datafusion/core/src/physical_plan/aggregates/mod.rs [463:568]


fn calc_required_input_ordering(
    input: &Arc<dyn ExecutionPlan>,
    aggr_exprs: &mut [Arc<dyn AggregateExpr>],
    order_by_exprs: &mut [Option<LexOrdering>],
    aggregator_reqs: LexOrderingReq,
    aggregator_reverse_reqs: Option<LexOrderingReq>,
    aggregation_ordering: &mut Option<AggregationOrdering>,
    mode: &AggregateMode,
) -> Result<Option<LexOrderingReq>> {
    let mut required_input_ordering = vec![];
    // Boolean shows that whether `required_input_ordering` stored comes from
    // `aggregator_reqs` or `aggregator_reverse_reqs`
    let mut reverse_req = false;
    // If reverse aggregator is None, there is no way to run aggregators in reverse mode. Hence ignore it during analysis
    let aggregator_requirements =
        if let Some(aggregator_reverse_reqs) = aggregator_reverse_reqs {
            // If existing ordering doesn't satisfy requirement, we should do calculations
            // on naive requirement (by convention, otherwise the final plan will be unintuitive),
            // even if reverse ordering is possible.
            // Hence, while iterating consider naive requirement last, by this way
            // we prioritize naive requirement over reverse requirement, when
            // reverse requirement is not helpful with removing SortExec from the plan.
            vec![(true, aggregator_reverse_reqs), (false, aggregator_reqs)]
        } else {
            vec![(false, aggregator_reqs)]
        };
    for (is_reverse, aggregator_requirement) in aggregator_requirements.into_iter() {
        if let Some(AggregationOrdering {
            // If the mode is FullyOrdered or PartiallyOrdered (i.e. we are
            // running with bounded memory, without breaking the pipeline),
            // then we append the aggregator ordering requirement to the existing
            // ordering. This way, we can still run with bounded memory.
            mode: GroupByOrderMode::FullyOrdered | GroupByOrderMode::PartiallyOrdered,
            order_indices,
            ..
        }) = aggregation_ordering
        {
            // Get the section of the input ordering that enables us to run in
            // FullyOrdered or PartiallyOrdered modes:
            let requirement_prefix =
                if let Some(existing_ordering) = input.output_ordering() {
                    &existing_ordering[0..order_indices.len()]
                } else {
                    &[]
                };
            let mut requirement =
                PhysicalSortRequirement::from_sort_exprs(requirement_prefix.iter());
            for req in aggregator_requirement {
                // Final and FinalPartitioned modes don't enforce ordering
                // requirements since order-sensitive aggregators handle such
                // requirements during merging.
                if mode.is_first_stage()
                    && requirement.iter().all(|item| req.expr.ne(&item.expr))
                {
                    requirement.push(req);
                }
            }
            required_input_ordering = requirement;
        } else if mode.is_first_stage() {
            required_input_ordering = aggregator_requirement;
        }
        // Keep track of the direction from which required_input_ordering is constructed:
        reverse_req = is_reverse;
        // If all the order-sensitive aggregate functions are reversible (e.g. all the
        // order-sensitive aggregators are either FIRST_VALUE or LAST_VALUE), then we can
        // run aggregate expressions either in the given required ordering, (i.e. finest
        // requirement that satisfies every aggregate function requirement) or its reverse
        // (opposite) direction. We analyze these two possibilities, and use the version that
        // satisfies existing ordering. This enables us to avoid an extra sort step in the final
        // plan. If neither version satisfies the existing ordering, we use the given ordering
        // requirement. In short, if running aggregators in reverse order help us to avoid a
        // sorting step, we do so. Otherwise, we use the aggregators as is.
        let existing_ordering = input.output_ordering().unwrap_or(&[]);
        if ordering_satisfy_requirement_concrete(
            existing_ordering,
            &required_input_ordering,
            || input.equivalence_properties(),
            || input.ordering_equivalence_properties(),
        ) {
            break;
        }
    }
    // If `required_input_ordering` is constructed using the reverse requirement, we
    // should reverse each `aggr_expr` in order to correctly calculate their results
    // in reverse order.
    if reverse_req {
        aggr_exprs
            .iter_mut()
            .zip(order_by_exprs.iter_mut())
            .map(|(aggr_expr, ob_expr)| {
                if is_order_sensitive(aggr_expr) {
                    if let Some(reverse) = aggr_expr.reverse_expr() {
                        *aggr_expr = reverse;
                        *ob_expr = ob_expr.as_ref().map(|obs| reverse_order_bys(obs));
                    } else {
                        return plan_err!(
                            "Aggregate expression should have a reverse expression"
                        );
                    }
                }
                Ok(())
            })
            .collect::<Result<Vec<_>>>()?;
    }
    Ok((!required_input_ordering.is_empty()).then_some(required_input_ordering))
}