in smallpond/execution/scheduler.py [0:0]
def run(self) -> bool:
mp.current_process().name = f"SchedulerMainProcess#{self.sched_epoch}"
logger.info(f"start to run scheduler #{self.sched_epoch} on {socket.gethostname()}")
perf_profile = None
if self.ctx.enable_profiling:
perf_profile = cProfile.Profile()
perf_profile.enable()
with ThreadPoolExecutor(32) as pool:
self.sched_running = True
self.sched_start_time = time.time()
self.last_executor_probe_time = 0
self.last_state_notify_time = 0
self.prioritize_retry |= self.sched_epoch > 0
if self.local_queue or self.sched_queue:
pending_tasks = [item for item in self.local_queue + self.sched_queue if isinstance(item, Task)]
self.local_queue.clear()
self.sched_queue.clear()
logger.info(f"requeue {len(pending_tasks)} pending tasks with latest epoch #{self.sched_epoch}: {pending_tasks[:3]} ...")
self.try_enqueue(pending_tasks)
if self.sched_epoch == 0:
leaf_tasks = self.exec_plan.leaves
logger.info(f"enqueue {len(leaf_tasks)} leaf tasks: {leaf_tasks[:3]} ...")
self.try_enqueue(leaf_tasks)
self.log_overall_progress()
while (num_finished_tasks := self.process_finished_tasks(pool)) > 0:
logger.info(f"processed {num_finished_tasks} finished tasks during startup")
self.log_overall_progress()
earlier_running_tasks = [item for item in self.running_works if isinstance(item, Task)]
if earlier_running_tasks:
logger.info(f"enqueue {len(earlier_running_tasks)} earlier running tasks: {earlier_running_tasks[:3]} ...")
self.try_enqueue(earlier_running_tasks)
self.suspend_good_executors()
self.add_state_observer(Scheduler.StateObserver(Scheduler.log_current_status))
self.add_state_observer(Scheduler.StateObserver(Scheduler.export_timeline_figs))
self.notify_state_observers(force_notify=True)
try:
self.local_executor.start(pool)
self.sched_loop(pool)
finally:
logger.info(f"schedule loop stopped")
self.sched_running = False
self.notify_state_observers(force_notify=True)
self.export_task_metrics()
self.stop_executors()
# if --output_path is specified, remove the output root as well
if self.remove_output_root or self.ctx.final_output_path:
remove_path(self.ctx.staging_root)
remove_path(self.ctx.output_root)
if self.success:
self.clean_temp_files(pool)
logger.success(f"final output path: {self.exec_plan.final_output_path}")
logger.info(f"analyzed plan:{os.linesep}{self.exec_plan.analyzed_logical_plan.explain_str()}")
if perf_profile is not None:
logger.debug(f"scheduler perf profile:{os.linesep}{cprofile_to_string(perf_profile)}")
logger.info(f"scheduler of job {self.ctx.job_id} exits")
logger.complete()
return self.success