in src/physical.rs [56:90]
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &datafusion::config::ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
debug!(
"optimizing physical plan:\n{}",
display_plan_with_partition_counts(&plan)
);
let mut stage_counter = 0;
let up = |plan: Arc<dyn ExecutionPlan>| {
if plan.as_any().downcast_ref::<RepartitionExec>().is_some()
|| plan.as_any().downcast_ref::<SortExec>().is_some()
|| plan.as_any().downcast_ref::<NestedLoopJoinExec>().is_some()
{
let stage = Arc::new(DFRayStageExec::new(plan, stage_counter));
stage_counter += 1;
Ok(Transformed::yes(stage as Arc<dyn ExecutionPlan>))
} else {
Ok(Transformed::no(plan))
}
};
let plan = plan.transform_up(up)?.data;
let final_plan =
Arc::new(DFRayStageExec::new(plan, stage_counter)) as Arc<dyn ExecutionPlan>;
debug!(
"optimized physical plan:\n{}",
display_plan_with_partition_counts(&final_plan)
);
Ok(final_plan)
}