fn pushdown_requirement_to_children()

in datafusion/core/src/physical_optimizer/sort_pushdown.rs [195:283]


fn pushdown_requirement_to_children(
    plan: &Arc<dyn ExecutionPlan>,
    parent_required: Option<&[PhysicalSortRequirement]>,
) -> Result<Option<Vec<Option<Vec<PhysicalSortRequirement>>>>> {
    const ERR_MSG: &str = "Expects parent requirement to contain something";
    let err = || DataFusionError::Plan(ERR_MSG.to_string());
    let maintains_input_order = plan.maintains_input_order();
    if is_window(plan) {
        let required_input_ordering = plan.required_input_ordering();
        let request_child = required_input_ordering[0].as_deref();
        let child_plan = plan.children()[0].clone();
        match determine_children_requirement(parent_required, request_child, child_plan) {
            RequirementsCompatibility::Satisfy => {
                Ok(Some(vec![request_child.map(|r| r.to_vec())]))
            }
            RequirementsCompatibility::Compatible(adjusted) => Ok(Some(vec![adjusted])),
            RequirementsCompatibility::NonCompatible => Ok(None),
        }
    } else if is_union(plan) {
        // UnionExec does not have real sort requirements for its input. Here we change the adjusted_request_ordering to UnionExec's output ordering and
        // propagate the sort requirements down to correct the unnecessary descendant SortExec under the UnionExec
        Ok(Some(vec![
            parent_required.map(|elem| elem.to_vec());
            plan.children().len()
        ]))
    } else if let Some(smj) = plan.as_any().downcast_ref::<SortMergeJoinExec>() {
        // If the current plan is SortMergeJoinExec
        let left_columns_len = smj.left.schema().fields().len();
        let parent_required_expr = PhysicalSortRequirement::to_sort_exprs(
            parent_required.ok_or_else(err)?.iter().cloned(),
        );
        let expr_source_side =
            expr_source_sides(&parent_required_expr, smj.join_type, left_columns_len);
        match expr_source_side {
            Some(JoinSide::Left) => try_pushdown_requirements_to_join(
                smj,
                parent_required,
                parent_required_expr,
                JoinSide::Left,
            ),
            Some(JoinSide::Right) => {
                let right_offset =
                    smj.schema().fields.len() - smj.right.schema().fields.len();
                let new_right_required =
                    shift_right_required(parent_required.ok_or_else(err)?, right_offset)?;
                let new_right_required_expr = PhysicalSortRequirement::to_sort_exprs(
                    new_right_required.iter().cloned(),
                );
                try_pushdown_requirements_to_join(
                    smj,
                    parent_required,
                    new_right_required_expr,
                    JoinSide::Right,
                )
            }
            _ => {
                // Can not decide the expr side for SortMergeJoinExec, can not push down
                Ok(None)
            }
        }
    } else if maintains_input_order.is_empty()
        || !maintains_input_order.iter().any(|o| *o)
        || plan.as_any().is::<RepartitionExec>()
        || plan.as_any().is::<FilterExec>()
        // TODO: Add support for Projection push down
        || plan.as_any().is::<ProjectionExec>()
        || is_limit(plan)
        || plan.as_any().is::<HashJoinExec>()
    {
        // If the current plan is a leaf node or can not maintain any of the input ordering, can not pushed down requirements.
        // For RepartitionExec, we always choose to not push down the sort requirements even the RepartitionExec(input_partition=1) could maintain input ordering.
        // Pushing down is not beneficial
        Ok(None)
    } else {
        Ok(Some(
            maintains_input_order
                .iter()
                .map(|flag| {
                    if *flag {
                        parent_required.map(|elem| elem.to_vec())
                    } else {
                        None
                    }
                })
                .collect::<Vec<_>>(),
        ))
    }
    // TODO: Add support for Projection push down
}