fn optimize_partitions()

in datafusion/core/src/physical_optimizer/repartition.rs [166:275]


fn optimize_partitions(
    target_partitions: usize,
    plan: Arc<dyn ExecutionPlan>,
    is_root: bool,
    can_reorder: bool,
    would_benefit: bool,
    repartition_file_scans: bool,
    repartition_file_min_size: usize,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
    // Recurse into children bottom-up (attempt to repartition as
    // early as possible)
    let new_plan = if plan.children().is_empty() {
        // leaf node - don't replace children
        Transformed::No(plan)
    } else {
        let children = plan
            .children()
            .iter()
            .enumerate()
            .map(|(idx, child)| {
                // Does plan itself (not its parent) require its input to
                // be sorted in some way?
                let required_input_ordering =
                    plan_has_required_input_ordering(plan.as_ref());

                // We can reorder a child if:
                //   - It has no ordering to preserve, or
                //   - Its parent has no required input ordering and does not
                //     maintain input ordering.
                // Check if this condition holds:
                let can_reorder_child = child.output_ordering().is_none()
                    || (!required_input_ordering
                        && (can_reorder || !plan.maintains_input_order()[idx]));

                optimize_partitions(
                    target_partitions,
                    child.clone(),
                    false, // child is not root
                    can_reorder_child,
                    plan.benefits_from_input_partitioning(),
                    repartition_file_scans,
                    repartition_file_min_size,
                )
                .map(Transformed::into)
            })
            .collect::<Result<_>>()?;
        with_new_children_if_necessary(plan, children)?
    };

    let (new_plan, transformed) = new_plan.into_pair();

    // decide if we should bother trying to repartition the output of this plan
    let mut could_repartition = match new_plan.output_partitioning() {
        // Apply when underlying node has less than `self.target_partitions` amount of concurrency
        RoundRobinBatch(x) => x < target_partitions,
        UnknownPartitioning(x) => x < target_partitions,
        // we don't want to introduce partitioning after hash partitioning
        // as the plan will likely depend on this
        Hash(_, _) => false,
    };

    // Don't need to apply when the returned row count is not greater than 1
    let stats = new_plan.statistics();
    if stats.is_exact {
        could_repartition = could_repartition
            && stats.num_rows.map(|num_rows| num_rows > 1).unwrap_or(true);
    }

    // don't repartition root of the plan
    if is_root {
        could_repartition = false;
    }

    let repartition_allowed = would_benefit && could_repartition && can_reorder;

    // If repartition is not allowed - return plan as it is
    if !repartition_allowed {
        return Ok(if transformed {
            Transformed::Yes(new_plan)
        } else {
            Transformed::No(new_plan)
        });
    }

    // For ParquetExec return internally repartitioned version of the plan in case `repartition_file_scans` is set
    if let Some(parquet_exec) = new_plan.as_any().downcast_ref::<ParquetExec>() {
        if repartition_file_scans {
            return Ok(Transformed::Yes(Arc::new(
                parquet_exec
                    .get_repartitioned(target_partitions, repartition_file_min_size),
            )));
        }
    }

    if let Some(csv_exec) = new_plan.as_any().downcast_ref::<CsvExec>() {
        if repartition_file_scans {
            let repartitioned_exec_option =
                csv_exec.get_repartitioned(target_partitions, repartition_file_min_size);
            if let Some(repartitioned_exec) = repartitioned_exec_option {
                return Ok(Transformed::Yes(Arc::new(repartitioned_exec)));
            }
        }
    }

    // Otherwise - return plan wrapped up in RepartitionExec
    Ok(Transformed::Yes(Arc::new(RepartitionExec::try_new(
        new_plan,
        RoundRobinBatch(target_partitions),
    )?)))
}