in src/dataframe.rs [267:316]
fn build_replacement(
plan: Arc<dyn ExecutionPlan>,
prefetch_buffer_size: usize,
partitions_per_worker: Option<usize>,
isolate: bool,
max_rows: usize,
inner_batch_size: usize,
) -> Result<(Vec<Vec<usize>>, Arc<dyn ExecutionPlan>), DataFusionError> {
let mut replacement = plan.clone();
let children = plan.children();
assert!(children.len() == 1, "Unexpected plan structure");
let child = children[0];
let partition_count = child.output_partitioning().partition_count();
trace!(
"build_replacement for {}, partition_count: {}",
displayable(plan.as_ref()).one_line(),
partition_count
);
let partition_groups = match partitions_per_worker {
Some(p) => (0..partition_count)
.chunks(p)
.into_iter()
.map(|chunk| chunk.collect())
.collect(),
None => vec![(0..partition_count).collect()],
};
if isolate && partition_groups.len() > 1 {
let new_child = Arc::new(PartitionIsolatorExec::new(
child.clone(),
partitions_per_worker.unwrap(), // we know it is a Some, here.
));
replacement = replacement.clone().with_new_children(vec![new_child])?;
}
// insert a coalescing batches here too so that we aren't sending
// too small (or too big) of batches over the network
replacement = Arc::new(MaxRowsExec::new(
Arc::new(CoalesceBatchesExec::new(replacement, inner_batch_size)) as Arc<dyn ExecutionPlan>,
max_rows,
)) as Arc<dyn ExecutionPlan>;
if prefetch_buffer_size > 0 {
replacement = Arc::new(PrefetchExec::new(replacement, prefetch_buffer_size))
as Arc<dyn ExecutionPlan>;
}
Ok((partition_groups, replacement))
}