in datafusion/core/src/physical_optimizer/sort_enforcement.rs [464:543]
fn ensure_sorting(
requirements: PlanWithCorrespondingSort,
) -> Result<Transformed<PlanWithCorrespondingSort>> {
// Perform naive analysis at the beginning -- remove already-satisfied sorts:
if requirements.plan.children().is_empty() {
return Ok(Transformed::No(requirements));
}
let plan = requirements.plan;
let mut children = plan.children();
let mut sort_onwards = requirements.sort_onwards;
if let Some(result) = analyze_immediate_sort_removal(&plan, &sort_onwards) {
return Ok(Transformed::Yes(result));
}
for (idx, (child, sort_onwards, required_ordering)) in izip!(
children.iter_mut(),
sort_onwards.iter_mut(),
plan.required_input_ordering()
)
.enumerate()
{
let physical_ordering = child.output_ordering();
match (required_ordering, physical_ordering) {
(Some(required_ordering), Some(physical_ordering)) => {
if !ordering_satisfy_requirement_concrete(
physical_ordering,
&required_ordering,
|| child.equivalence_properties(),
|| child.ordering_equivalence_properties(),
) {
// Make sure we preserve the ordering requirements:
update_child_to_remove_unnecessary_sort(child, sort_onwards, &plan)?;
let sort_expr =
PhysicalSortRequirement::to_sort_exprs(required_ordering);
add_sort_above(child, sort_expr, None)?;
if is_sort(child) {
*sort_onwards = Some(ExecTree::new(child.clone(), idx, vec![]));
} else {
*sort_onwards = None;
}
}
}
(Some(required), None) => {
// Ordering requirement is not met, we should add a `SortExec` to the plan.
let sort_expr = PhysicalSortRequirement::to_sort_exprs(required);
add_sort_above(child, sort_expr, None)?;
*sort_onwards = Some(ExecTree::new(child.clone(), idx, vec![]));
}
(None, Some(_)) => {
// We have a `SortExec` whose effect may be neutralized by
// another order-imposing operator. Remove this sort.
if !plan.maintains_input_order()[idx] || is_union(&plan) {
update_child_to_remove_unnecessary_sort(child, sort_onwards, &plan)?;
}
}
(None, None) => {}
}
}
// For window expressions, we can remove some sorts when we can
// calculate the result in reverse:
if is_window(&plan) {
if let Some(tree) = &mut sort_onwards[0] {
if let Some(result) = analyze_window_sort_removal(tree, &plan)? {
return Ok(Transformed::Yes(result));
}
}
} else if is_sort_preserving_merge(&plan)
&& children[0].output_partitioning().partition_count() <= 1
{
// This SortPreservingMergeExec is unnecessary, input already has a
// single partition.
return Ok(Transformed::Yes(PlanWithCorrespondingSort {
plan: children[0].clone(),
sort_onwards: vec![sort_onwards[0].clone()],
}));
}
Ok(Transformed::Yes(PlanWithCorrespondingSort {
plan: plan.with_new_children(children)?,
sort_onwards,
}))
}