in smallpond/logical/planner.py [0:0]
def create_exec_plan(self, logical_plan: LogicalPlan, manifest_only_final_results=True) -> ExecutionPlan:
logical_plan = copy.deepcopy(logical_plan)
# if --output_path is specified, copy files to the output path
# otherwise, create manifest files only
sink_type = "copy" if self.runtime_ctx.final_output_path is not None else "manifest"
final_sink_type = "copy" if self.runtime_ctx.final_output_path is not None else "manifest" if manifest_only_final_results else "link"
# create DataSinkNode for each named output node (same name share the same sink node)
nodes_groupby_output_name: Dict[str, List[Node]] = defaultdict(list)
for node in logical_plan.nodes.values():
if node.output_name is not None:
if node.output_name in nodes_groupby_output_name:
warnings.warn(f"{node} has duplicate output name: {node.output_name}")
nodes_groupby_output_name[node.output_name].append(node)
sink_nodes = {} # { output_name: DataSinkNode }
for output_name, nodes in nodes_groupby_output_name.items():
output_path = os.path.join(
self.runtime_ctx.final_output_path or self.runtime_ctx.output_root,
output_name,
)
sink_nodes[output_name] = DataSinkNode(logical_plan.ctx, tuple(nodes), output_path, type=sink_type)
# create DataSinkNode for root node
# XXX: special case optimization to avoid copying files twice
# if root node is DataSetPartitionNode(npartitions=1), and all its input nodes are named, create manifest files instead of copying files.
if (
isinstance(logical_plan.root_node, ConsolidateNode)
and len(logical_plan.root_node.input_deps) == 1
and isinstance(
partition_node := logical_plan.root_node.input_deps[0],
EvenlyDistributedPartitionNode,
)
and all(node.output_name is not None for node in partition_node.input_deps)
):
sink_nodes["FinalResults"] = DataSinkNode(
logical_plan.ctx,
tuple(sink_nodes[node.output_name] for node in partition_node.input_deps),
output_path=os.path.join(
self.runtime_ctx.final_output_path or self.runtime_ctx.output_root,
"FinalResults",
),
type="manifest",
is_final_node=True,
)
# if root node also has output_name, create manifest files instead of copying files.
elif (output_name := logical_plan.root_node.output_name) is not None:
sink_nodes["FinalResults"] = DataSinkNode(
logical_plan.ctx,
(sink_nodes[output_name],),
output_path=os.path.join(
self.runtime_ctx.final_output_path or self.runtime_ctx.output_root,
"FinalResults",
),
type="manifest",
is_final_node=True,
)
else:
# normal case
sink_nodes["FinalResults"] = DataSinkNode(
logical_plan.ctx,
(logical_plan.root_node,),
output_path=os.path.join(
self.runtime_ctx.final_output_path or self.runtime_ctx.output_root,
"FinalResults",
),
type=final_sink_type,
is_final_node=True,
)
# assemble sink nodes as new root node
root_node = RootNode(logical_plan.ctx, tuple(sink_nodes.values()))
logical_plan = LogicalPlan(logical_plan.ctx, root_node)
# generate tasks
[root_task] = self.visit(root_node)
# print logical plan with the generated runtime tasks
logger.info(f"logical plan:{os.linesep}{str(logical_plan)}")
exec_plan = ExecutionPlan(self.runtime_ctx, root_task, logical_plan)
return exec_plan