fn build_replacement()

in src/dataframe.rs [267:316]


fn build_replacement(
    plan: Arc<dyn ExecutionPlan>,
    prefetch_buffer_size: usize,
    partitions_per_worker: Option<usize>,
    isolate: bool,
    max_rows: usize,
    inner_batch_size: usize,
) -> Result<(Vec<Vec<usize>>, Arc<dyn ExecutionPlan>), DataFusionError> {
    let mut replacement = plan.clone();
    let children = plan.children();
    assert!(children.len() == 1, "Unexpected plan structure");

    let child = children[0];
    let partition_count = child.output_partitioning().partition_count();
    trace!(
        "build_replacement for {}, partition_count: {}",
        displayable(plan.as_ref()).one_line(),
        partition_count
    );

    let partition_groups = match partitions_per_worker {
        Some(p) => (0..partition_count)
            .chunks(p)
            .into_iter()
            .map(|chunk| chunk.collect())
            .collect(),
        None => vec![(0..partition_count).collect()],
    };

    if isolate && partition_groups.len() > 1 {
        let new_child = Arc::new(PartitionIsolatorExec::new(
            child.clone(),
            partitions_per_worker.unwrap(), // we know it is a Some, here.
        ));
        replacement = replacement.clone().with_new_children(vec![new_child])?;
    }
    // insert a coalescing batches here too so that we aren't sending
    // too small (or too big) of batches over the network
    replacement = Arc::new(MaxRowsExec::new(
        Arc::new(CoalesceBatchesExec::new(replacement, inner_batch_size)) as Arc<dyn ExecutionPlan>,
        max_rows,
    )) as Arc<dyn ExecutionPlan>;

    if prefetch_buffer_size > 0 {
        replacement = Arc::new(PrefetchExec::new(replacement, prefetch_buffer_size))
            as Arc<dyn ExecutionPlan>;
    }

    Ok((partition_groups, replacement))
}