fn pushdown_requirement_to_children()

in datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs [197:350]


fn pushdown_requirement_to_children(
    plan: &Arc<dyn ExecutionPlan>,
    parent_required: &LexRequirement,
) -> Result<Option<Vec<Option<LexRequirement>>>> {
    let maintains_input_order = plan.maintains_input_order();
    if is_window(plan) {
        let required_input_ordering = plan.required_input_ordering();
        let request_child = required_input_ordering[0].clone().unwrap_or_default();
        let child_plan = plan.children().swap_remove(0);

        match determine_children_requirement(parent_required, &request_child, child_plan)
        {
            RequirementsCompatibility::Satisfy => {
                let req = (!request_child.is_empty())
                    .then(|| LexRequirement::new(request_child.to_vec()));
                Ok(Some(vec![req]))
            }
            RequirementsCompatibility::Compatible(adjusted) => {
                // If parent requirements are more specific than output ordering
                // of the window plan, then we can deduce that the parent expects
                // an ordering from the columns created by window functions. If
                // that's the case, we block the pushdown of sort operation.
                if !plan
                    .equivalence_properties()
                    .ordering_satisfy_requirement(parent_required)
                {
                    return Ok(None);
                }

                Ok(Some(vec![adjusted]))
            }
            RequirementsCompatibility::NonCompatible => Ok(None),
        }
    } else if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
        let sort_req = LexRequirement::from(
            sort_exec
                .properties()
                .output_ordering()
                .cloned()
                .unwrap_or_else(LexOrdering::default),
        );
        if sort_exec
            .properties()
            .eq_properties
            .requirements_compatible(parent_required, &sort_req)
        {
            debug_assert!(!parent_required.is_empty());
            Ok(Some(vec![Some(LexRequirement::new(
                parent_required.to_vec(),
            ))]))
        } else {
            Ok(None)
        }
    } else if plan.fetch().is_some()
        && plan.supports_limit_pushdown()
        && plan
            .maintains_input_order()
            .iter()
            .all(|maintain| *maintain)
    {
        let output_req = LexRequirement::from(
            plan.properties()
                .output_ordering()
                .cloned()
                .unwrap_or_else(LexOrdering::default),
        );
        // Push down through operator with fetch when:
        // - requirement is aligned with output ordering
        // - it preserves ordering during execution
        if plan
            .properties()
            .eq_properties
            .requirements_compatible(parent_required, &output_req)
        {
            let req = (!parent_required.is_empty())
                .then(|| LexRequirement::new(parent_required.to_vec()));
            Ok(Some(vec![req]))
        } else {
            Ok(None)
        }
    } else if is_union(plan) {
        // UnionExec does not have real sort requirements for its input. Here we change the adjusted_request_ordering to UnionExec's output ordering and
        // propagate the sort requirements down to correct the unnecessary descendant SortExec under the UnionExec
        let req = (!parent_required.is_empty()).then(|| parent_required.clone());
        Ok(Some(vec![req; plan.children().len()]))
    } else if let Some(smj) = plan.as_any().downcast_ref::<SortMergeJoinExec>() {
        // If the current plan is SortMergeJoinExec
        let left_columns_len = smj.left().schema().fields().len();
        let parent_required_expr = LexOrdering::from(parent_required.clone());
        match expr_source_side(
            parent_required_expr.as_ref(),
            smj.join_type(),
            left_columns_len,
        ) {
            Some(JoinSide::Left) => try_pushdown_requirements_to_join(
                smj,
                parent_required,
                parent_required_expr.as_ref(),
                JoinSide::Left,
            ),
            Some(JoinSide::Right) => {
                let right_offset =
                    smj.schema().fields.len() - smj.right().schema().fields.len();
                let new_right_required =
                    shift_right_required(parent_required, right_offset)?;
                let new_right_required_expr = LexOrdering::from(new_right_required);
                try_pushdown_requirements_to_join(
                    smj,
                    parent_required,
                    new_right_required_expr.as_ref(),
                    JoinSide::Right,
                )
            }
            _ => {
                // Can not decide the expr side for SortMergeJoinExec, can not push down
                Ok(None)
            }
        }
    } else if maintains_input_order.is_empty()
        || !maintains_input_order.iter().any(|o| *o)
        || plan.as_any().is::<RepartitionExec>()
        || plan.as_any().is::<FilterExec>()
        // TODO: Add support for Projection push down
        || plan.as_any().is::<ProjectionExec>()
        || pushdown_would_violate_requirements(parent_required, plan.as_ref())
    {
        // If the current plan is a leaf node or can not maintain any of the input ordering, can not pushed down requirements.
        // For RepartitionExec, we always choose to not push down the sort requirements even the RepartitionExec(input_partition=1) could maintain input ordering.
        // Pushing down is not beneficial
        Ok(None)
    } else if is_sort_preserving_merge(plan) {
        let new_ordering = LexOrdering::from(parent_required.clone());
        let mut spm_eqs = plan.equivalence_properties().clone();
        // Sort preserving merge will have new ordering, one requirement above is pushed down to its below.
        spm_eqs = spm_eqs.with_reorder(new_ordering);
        // Do not push-down through SortPreservingMergeExec when
        // ordering requirement invalidates requirement of sort preserving merge exec.
        if !spm_eqs.ordering_satisfy(&plan.output_ordering().cloned().unwrap_or_default())
        {
            Ok(None)
        } else {
            // Can push-down through SortPreservingMergeExec, because parent requirement is finer
            // than SortPreservingMergeExec output ordering.
            let req = (!parent_required.is_empty())
                .then(|| LexRequirement::new(parent_required.to_vec()));
            Ok(Some(vec![req]))
        }
    } else if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
        handle_hash_join(hash_join, parent_required)
    } else {
        handle_custom_pushdown(plan, parent_required, maintains_input_order)
    }
    // TODO: Add support for Projection push down
}