fn adjust_input_keys_ordering()

in datafusion/core/src/physical_optimizer/dist_enforcement.rs [144:300]


fn adjust_input_keys_ordering(
    requirements: PlanWithKeyRequirements,
) -> Result<Transformed<PlanWithKeyRequirements>> {
    let parent_required = requirements.required_key_ordering.clone();
    let plan_any = requirements.plan.as_any();
    let transformed = if let Some(HashJoinExec {
        left,
        right,
        on,
        filter,
        join_type,
        mode,
        null_equals_null,
        ..
    }) = plan_any.downcast_ref::<HashJoinExec>()
    {
        match mode {
            PartitionMode::Partitioned => {
                let join_constructor =
                    |new_conditions: (Vec<(Column, Column)>, Vec<SortOptions>)| {
                        Ok(Arc::new(HashJoinExec::try_new(
                            left.clone(),
                            right.clone(),
                            new_conditions.0,
                            filter.clone(),
                            join_type,
                            PartitionMode::Partitioned,
                            *null_equals_null,
                        )?) as Arc<dyn ExecutionPlan>)
                    };
                Some(reorder_partitioned_join_keys(
                    requirements.plan.clone(),
                    &parent_required,
                    on,
                    vec![],
                    &join_constructor,
                )?)
            }
            PartitionMode::CollectLeft => {
                let new_right_request = match join_type {
                    JoinType::Inner | JoinType::Right => shift_right_required(
                        &parent_required,
                        left.schema().fields().len(),
                    ),
                    JoinType::RightSemi | JoinType::RightAnti => {
                        Some(parent_required.clone())
                    }
                    JoinType::Left
                    | JoinType::LeftSemi
                    | JoinType::LeftAnti
                    | JoinType::Full => None,
                };

                // Push down requirements to the right side
                Some(PlanWithKeyRequirements {
                    plan: requirements.plan.clone(),
                    required_key_ordering: vec![],
                    request_key_ordering: vec![None, new_right_request],
                })
            }
            PartitionMode::Auto => {
                // Can not satisfy, clear the current requirements and generate new empty requirements
                Some(PlanWithKeyRequirements::new(requirements.plan.clone()))
            }
        }
    } else if let Some(CrossJoinExec { left, .. }) =
        plan_any.downcast_ref::<CrossJoinExec>()
    {
        let left_columns_len = left.schema().fields().len();
        // Push down requirements to the right side
        Some(PlanWithKeyRequirements {
            plan: requirements.plan.clone(),
            required_key_ordering: vec![],
            request_key_ordering: vec![
                None,
                shift_right_required(&parent_required, left_columns_len),
            ],
        })
    } else if let Some(SortMergeJoinExec {
        left,
        right,
        on,
        join_type,
        sort_options,
        null_equals_null,
        ..
    }) = plan_any.downcast_ref::<SortMergeJoinExec>()
    {
        let join_constructor =
            |new_conditions: (Vec<(Column, Column)>, Vec<SortOptions>)| {
                Ok(Arc::new(SortMergeJoinExec::try_new(
                    left.clone(),
                    right.clone(),
                    new_conditions.0,
                    *join_type,
                    new_conditions.1,
                    *null_equals_null,
                )?) as Arc<dyn ExecutionPlan>)
            };
        Some(reorder_partitioned_join_keys(
            requirements.plan.clone(),
            &parent_required,
            on,
            sort_options.clone(),
            &join_constructor,
        )?)
    } else if let Some(aggregate_exec) = plan_any.downcast_ref::<AggregateExec>() {
        if !parent_required.is_empty() {
            match aggregate_exec.mode {
                AggregateMode::FinalPartitioned => Some(reorder_aggregate_keys(
                    requirements.plan.clone(),
                    &parent_required,
                    aggregate_exec,
                )?),
                _ => Some(PlanWithKeyRequirements::new(requirements.plan.clone())),
            }
        } else {
            // Keep everything unchanged
            None
        }
    } else if let Some(ProjectionExec { expr, .. }) =
        plan_any.downcast_ref::<ProjectionExec>()
    {
        // For Projection, we need to transform the requirements to the columns before the Projection
        // And then to push down the requirements
        // Construct a mapping from new name to the the orginal Column
        let new_required = map_columns_before_projection(&parent_required, expr);
        if new_required.len() == parent_required.len() {
            Some(PlanWithKeyRequirements {
                plan: requirements.plan.clone(),
                required_key_ordering: vec![],
                request_key_ordering: vec![Some(new_required.clone())],
            })
        } else {
            // Can not satisfy, clear the current requirements and generate new empty requirements
            Some(PlanWithKeyRequirements::new(requirements.plan.clone()))
        }
    } else if plan_any.downcast_ref::<RepartitionExec>().is_some()
        || plan_any.downcast_ref::<CoalescePartitionsExec>().is_some()
        || plan_any.downcast_ref::<WindowAggExec>().is_some()
    {
        Some(PlanWithKeyRequirements::new(requirements.plan.clone()))
    } else {
        // By default, push down the parent requirements to children
        let children_len = requirements.plan.children().len();
        Some(PlanWithKeyRequirements {
            plan: requirements.plan.clone(),
            required_key_ordering: vec![],
            request_key_ordering: vec![Some(parent_required.clone()); children_len],
        })
    };
    Ok(if let Some(transformed) = transformed {
        Transformed::Yes(transformed)
    } else {
        Transformed::No(requirements)
    })
}