in smallpond/dataframe.py [0:0]
def _get_or_create_tasks(self) -> List[Task]:
"""
Get or create tasks to compute the data.
"""
# optimize the plan
if self.optimized_plan is None:
logger.info(f"optimizing\n{LogicalPlan(self.session._ctx, self.plan)}")
self.optimized_plan = Optimizer(exclude_nodes=set(self.session._node_to_tasks.keys())).visit(self.plan)
logger.info(f"optimized\n{LogicalPlan(self.session._ctx, self.optimized_plan)}")
# return the tasks if already created
if tasks := self.session._node_to_tasks.get(self.optimized_plan):
return tasks
# remove all completed task files if recompute is needed
if self.need_recompute:
remove_path(
os.path.join(
self.session._runtime_ctx.completed_task_dir,
str(self.optimized_plan.id),
)
)
logger.info(f"cleared all results of {self.optimized_plan!r}")
# create tasks for the optimized plan
planner = Planner(self.session._runtime_ctx)
# let planner update self.session._node_to_tasks
planner.node_to_tasks = self.session._node_to_tasks
return planner.visit(self.optimized_plan)