fn analyze_window_sort_removal()

in datafusion/core/src/physical_optimizer/sort_enforcement.rs [596:700]


fn analyze_window_sort_removal(
    sort_tree: &mut ExecTree,
    window_exec: &Arc<dyn ExecutionPlan>,
) -> Result<Option<PlanWithCorrespondingSort>> {
    let (window_expr, partition_keys) =
        if let Some(exec) = window_exec.as_any().downcast_ref::<BoundedWindowAggExec>() {
            (exec.window_expr(), &exec.partition_keys)
        } else if let Some(exec) = window_exec.as_any().downcast_ref::<WindowAggExec>() {
            (exec.window_expr(), &exec.partition_keys)
        } else {
            return plan_err!(
                "Expects to receive either WindowAggExec of BoundedWindowAggExec"
            );
        };
    let partitionby_exprs = window_expr[0].partition_by();
    let orderby_sort_keys = window_expr[0].order_by();

    // search_flags stores return value of the can_skip_sort.
    // `None` case represents `SortExec` cannot be removed.
    // `PartitionSearch` mode stores at which mode executor should work to remove
    // `SortExec` before it,
    // `bool` stores whether or not we need to reverse window expressions to remove `SortExec`.
    let mut search_flags = None;
    for sort_any in sort_tree.get_leaves() {
        // Variable `sort_any` will either be a `SortExec` or a
        // `SortPreservingMergeExec`, and both have a single child.
        // Therefore, we can use the 0th index without loss of generality.
        let sort_input = &sort_any.children()[0];
        let flags = can_skip_sort(partitionby_exprs, orderby_sort_keys, sort_input)?;
        if flags.is_some() && (search_flags.is_none() || search_flags == flags) {
            search_flags = flags;
            continue;
        }
        // We can not skip the sort, or window reversal requirements are not
        // uniform; then sort removal is not possible -- we immediately return.
        return Ok(None);
    }
    let (should_reverse, partition_search_mode) = if let Some(search_flags) = search_flags
    {
        search_flags
    } else {
        // We can not skip the sort return:
        return Ok(None);
    };
    let is_unbounded = unbounded_output(window_exec);
    if !is_unbounded && partition_search_mode != PartitionSearchMode::Sorted {
        // Executor has bounded input and `partition_search_mode` is not `PartitionSearchMode::Sorted`
        // in this case removing the sort is not helpful, return:
        return Ok(None);
    };

    let new_window_expr = if should_reverse {
        window_expr
            .iter()
            .map(|e| e.get_reverse_expr())
            .collect::<Option<Vec<_>>>()
    } else {
        Some(window_expr.to_vec())
    };
    if let Some(window_expr) = new_window_expr {
        let requires_single_partition = matches!(
            window_exec.required_input_distribution()[sort_tree.idx],
            Distribution::SinglePartition
        );
        let mut new_child = remove_corresponding_sort_from_sub_plan(
            sort_tree,
            requires_single_partition,
        )?;
        let new_schema = new_child.schema();

        let uses_bounded_memory = window_expr.iter().all(|e| e.uses_bounded_memory());
        // If all window expressions can run with bounded memory, choose the
        // bounded window variant:
        let new_plan = if uses_bounded_memory {
            Arc::new(BoundedWindowAggExec::try_new(
                window_expr,
                new_child,
                new_schema,
                partition_keys.to_vec(),
                partition_search_mode,
            )?) as _
        } else {
            if partition_search_mode != PartitionSearchMode::Sorted {
                // For `WindowAggExec` to work correctly PARTITION BY columns should be sorted.
                // Hence, if `partition_search_mode` is not `PartitionSearchMode::Sorted` we should convert
                // input ordering such that it can work with PartitionSearchMode::Sorted (add `SortExec`).
                // Effectively `WindowAggExec` works only in PartitionSearchMode::Sorted mode.
                let reqs = window_exec
                    .required_input_ordering()
                    .swap_remove(0)
                    .unwrap_or(vec![]);
                let sort_expr = PhysicalSortRequirement::to_sort_exprs(reqs);
                add_sort_above(&mut new_child, sort_expr, None)?;
            };
            Arc::new(WindowAggExec::try_new(
                window_expr,
                new_child,
                new_schema,
                partition_keys.to_vec(),
            )?) as _
        };
        return Ok(Some(PlanWithCorrespondingSort::new(new_plan)));
    }
    Ok(None)
}