in datafusion/core/src/physical_optimizer/sort_enforcement.rs [596:700]
fn analyze_window_sort_removal(
sort_tree: &mut ExecTree,
window_exec: &Arc<dyn ExecutionPlan>,
) -> Result<Option<PlanWithCorrespondingSort>> {
let (window_expr, partition_keys) =
if let Some(exec) = window_exec.as_any().downcast_ref::<BoundedWindowAggExec>() {
(exec.window_expr(), &exec.partition_keys)
} else if let Some(exec) = window_exec.as_any().downcast_ref::<WindowAggExec>() {
(exec.window_expr(), &exec.partition_keys)
} else {
return plan_err!(
"Expects to receive either WindowAggExec of BoundedWindowAggExec"
);
};
let partitionby_exprs = window_expr[0].partition_by();
let orderby_sort_keys = window_expr[0].order_by();
// search_flags stores return value of the can_skip_sort.
// `None` case represents `SortExec` cannot be removed.
// `PartitionSearch` mode stores at which mode executor should work to remove
// `SortExec` before it,
// `bool` stores whether or not we need to reverse window expressions to remove `SortExec`.
let mut search_flags = None;
for sort_any in sort_tree.get_leaves() {
// Variable `sort_any` will either be a `SortExec` or a
// `SortPreservingMergeExec`, and both have a single child.
// Therefore, we can use the 0th index without loss of generality.
let sort_input = &sort_any.children()[0];
let flags = can_skip_sort(partitionby_exprs, orderby_sort_keys, sort_input)?;
if flags.is_some() && (search_flags.is_none() || search_flags == flags) {
search_flags = flags;
continue;
}
// We can not skip the sort, or window reversal requirements are not
// uniform; then sort removal is not possible -- we immediately return.
return Ok(None);
}
let (should_reverse, partition_search_mode) = if let Some(search_flags) = search_flags
{
search_flags
} else {
// We can not skip the sort return:
return Ok(None);
};
let is_unbounded = unbounded_output(window_exec);
if !is_unbounded && partition_search_mode != PartitionSearchMode::Sorted {
// Executor has bounded input and `partition_search_mode` is not `PartitionSearchMode::Sorted`
// in this case removing the sort is not helpful, return:
return Ok(None);
};
let new_window_expr = if should_reverse {
window_expr
.iter()
.map(|e| e.get_reverse_expr())
.collect::<Option<Vec<_>>>()
} else {
Some(window_expr.to_vec())
};
if let Some(window_expr) = new_window_expr {
let requires_single_partition = matches!(
window_exec.required_input_distribution()[sort_tree.idx],
Distribution::SinglePartition
);
let mut new_child = remove_corresponding_sort_from_sub_plan(
sort_tree,
requires_single_partition,
)?;
let new_schema = new_child.schema();
let uses_bounded_memory = window_expr.iter().all(|e| e.uses_bounded_memory());
// If all window expressions can run with bounded memory, choose the
// bounded window variant:
let new_plan = if uses_bounded_memory {
Arc::new(BoundedWindowAggExec::try_new(
window_expr,
new_child,
new_schema,
partition_keys.to_vec(),
partition_search_mode,
)?) as _
} else {
if partition_search_mode != PartitionSearchMode::Sorted {
// For `WindowAggExec` to work correctly PARTITION BY columns should be sorted.
// Hence, if `partition_search_mode` is not `PartitionSearchMode::Sorted` we should convert
// input ordering such that it can work with PartitionSearchMode::Sorted (add `SortExec`).
// Effectively `WindowAggExec` works only in PartitionSearchMode::Sorted mode.
let reqs = window_exec
.required_input_ordering()
.swap_remove(0)
.unwrap_or(vec![]);
let sort_expr = PhysicalSortRequirement::to_sort_exprs(reqs);
add_sort_above(&mut new_child, sort_expr, None)?;
};
Arc::new(WindowAggExec::try_new(
window_expr,
new_child,
new_schema,
partition_keys.to_vec(),
)?) as _
};
return Ok(Some(PlanWithCorrespondingSort::new(new_plan)));
}
Ok(None)
}