def create_exec_plan()

in smallpond/logical/planner.py [0:0]


    def create_exec_plan(self, logical_plan: LogicalPlan, manifest_only_final_results=True) -> ExecutionPlan:
        logical_plan = copy.deepcopy(logical_plan)

        # if --output_path is specified, copy files to the output path
        # otherwise, create manifest files only
        sink_type = "copy" if self.runtime_ctx.final_output_path is not None else "manifest"
        final_sink_type = "copy" if self.runtime_ctx.final_output_path is not None else "manifest" if manifest_only_final_results else "link"

        # create DataSinkNode for each named output node (same name share the same sink node)
        nodes_groupby_output_name: Dict[str, List[Node]] = defaultdict(list)
        for node in logical_plan.nodes.values():
            if node.output_name is not None:
                if node.output_name in nodes_groupby_output_name:
                    warnings.warn(f"{node} has duplicate output name: {node.output_name}")
                nodes_groupby_output_name[node.output_name].append(node)
        sink_nodes = {}  # { output_name: DataSinkNode }
        for output_name, nodes in nodes_groupby_output_name.items():
            output_path = os.path.join(
                self.runtime_ctx.final_output_path or self.runtime_ctx.output_root,
                output_name,
            )
            sink_nodes[output_name] = DataSinkNode(logical_plan.ctx, tuple(nodes), output_path, type=sink_type)

        # create DataSinkNode for root node
        # XXX: special case optimization to avoid copying files twice
        # if root node is DataSetPartitionNode(npartitions=1), and all its input nodes are named, create manifest files instead of copying files.
        if (
            isinstance(logical_plan.root_node, ConsolidateNode)
            and len(logical_plan.root_node.input_deps) == 1
            and isinstance(
                partition_node := logical_plan.root_node.input_deps[0],
                EvenlyDistributedPartitionNode,
            )
            and all(node.output_name is not None for node in partition_node.input_deps)
        ):
            sink_nodes["FinalResults"] = DataSinkNode(
                logical_plan.ctx,
                tuple(sink_nodes[node.output_name] for node in partition_node.input_deps),
                output_path=os.path.join(
                    self.runtime_ctx.final_output_path or self.runtime_ctx.output_root,
                    "FinalResults",
                ),
                type="manifest",
                is_final_node=True,
            )
        # if root node also has output_name, create manifest files instead of copying files.
        elif (output_name := logical_plan.root_node.output_name) is not None:
            sink_nodes["FinalResults"] = DataSinkNode(
                logical_plan.ctx,
                (sink_nodes[output_name],),
                output_path=os.path.join(
                    self.runtime_ctx.final_output_path or self.runtime_ctx.output_root,
                    "FinalResults",
                ),
                type="manifest",
                is_final_node=True,
            )
        else:
            # normal case
            sink_nodes["FinalResults"] = DataSinkNode(
                logical_plan.ctx,
                (logical_plan.root_node,),
                output_path=os.path.join(
                    self.runtime_ctx.final_output_path or self.runtime_ctx.output_root,
                    "FinalResults",
                ),
                type=final_sink_type,
                is_final_node=True,
            )

        # assemble sink nodes as new root node
        root_node = RootNode(logical_plan.ctx, tuple(sink_nodes.values()))
        logical_plan = LogicalPlan(logical_plan.ctx, root_node)

        # generate tasks
        [root_task] = self.visit(root_node)
        # print logical plan with the generated runtime tasks
        logger.info(f"logical plan:{os.linesep}{str(logical_plan)}")

        exec_plan = ExecutionPlan(self.runtime_ctx, root_task, logical_plan)

        return exec_plan