in datafusion/core/src/physical_optimizer/repartition.rs [166:275]
fn optimize_partitions(
target_partitions: usize,
plan: Arc<dyn ExecutionPlan>,
is_root: bool,
can_reorder: bool,
would_benefit: bool,
repartition_file_scans: bool,
repartition_file_min_size: usize,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
// Recurse into children bottom-up (attempt to repartition as
// early as possible)
let new_plan = if plan.children().is_empty() {
// leaf node - don't replace children
Transformed::No(plan)
} else {
let children = plan
.children()
.iter()
.enumerate()
.map(|(idx, child)| {
// Does plan itself (not its parent) require its input to
// be sorted in some way?
let required_input_ordering =
plan_has_required_input_ordering(plan.as_ref());
// We can reorder a child if:
// - It has no ordering to preserve, or
// - Its parent has no required input ordering and does not
// maintain input ordering.
// Check if this condition holds:
let can_reorder_child = child.output_ordering().is_none()
|| (!required_input_ordering
&& (can_reorder || !plan.maintains_input_order()[idx]));
optimize_partitions(
target_partitions,
child.clone(),
false, // child is not root
can_reorder_child,
plan.benefits_from_input_partitioning(),
repartition_file_scans,
repartition_file_min_size,
)
.map(Transformed::into)
})
.collect::<Result<_>>()?;
with_new_children_if_necessary(plan, children)?
};
let (new_plan, transformed) = new_plan.into_pair();
// decide if we should bother trying to repartition the output of this plan
let mut could_repartition = match new_plan.output_partitioning() {
// Apply when underlying node has less than `self.target_partitions` amount of concurrency
RoundRobinBatch(x) => x < target_partitions,
UnknownPartitioning(x) => x < target_partitions,
// we don't want to introduce partitioning after hash partitioning
// as the plan will likely depend on this
Hash(_, _) => false,
};
// Don't need to apply when the returned row count is not greater than 1
let stats = new_plan.statistics();
if stats.is_exact {
could_repartition = could_repartition
&& stats.num_rows.map(|num_rows| num_rows > 1).unwrap_or(true);
}
// don't repartition root of the plan
if is_root {
could_repartition = false;
}
let repartition_allowed = would_benefit && could_repartition && can_reorder;
// If repartition is not allowed - return plan as it is
if !repartition_allowed {
return Ok(if transformed {
Transformed::Yes(new_plan)
} else {
Transformed::No(new_plan)
});
}
// For ParquetExec return internally repartitioned version of the plan in case `repartition_file_scans` is set
if let Some(parquet_exec) = new_plan.as_any().downcast_ref::<ParquetExec>() {
if repartition_file_scans {
return Ok(Transformed::Yes(Arc::new(
parquet_exec
.get_repartitioned(target_partitions, repartition_file_min_size),
)));
}
}
if let Some(csv_exec) = new_plan.as_any().downcast_ref::<CsvExec>() {
if repartition_file_scans {
let repartitioned_exec_option =
csv_exec.get_repartitioned(target_partitions, repartition_file_min_size);
if let Some(repartitioned_exec) = repartitioned_exec_option {
return Ok(Transformed::Yes(Arc::new(repartitioned_exec)));
}
}
}
// Otherwise - return plan wrapped up in RepartitionExec
Ok(Transformed::Yes(Arc::new(RepartitionExec::try_new(
new_plan,
RoundRobinBatch(target_partitions),
)?)))
}