def run()

in smallpond/execution/scheduler.py [0:0]


    def run(self) -> bool:
        mp.current_process().name = f"SchedulerMainProcess#{self.sched_epoch}"
        logger.info(f"start to run scheduler #{self.sched_epoch} on {socket.gethostname()}")

        perf_profile = None
        if self.ctx.enable_profiling:
            perf_profile = cProfile.Profile()
            perf_profile.enable()

        with ThreadPoolExecutor(32) as pool:
            self.sched_running = True
            self.sched_start_time = time.time()
            self.last_executor_probe_time = 0
            self.last_state_notify_time = 0
            self.prioritize_retry |= self.sched_epoch > 0

            if self.local_queue or self.sched_queue:
                pending_tasks = [item for item in self.local_queue + self.sched_queue if isinstance(item, Task)]
                self.local_queue.clear()
                self.sched_queue.clear()
                logger.info(f"requeue {len(pending_tasks)} pending tasks with latest epoch #{self.sched_epoch}: {pending_tasks[:3]} ...")
                self.try_enqueue(pending_tasks)

            if self.sched_epoch == 0:
                leaf_tasks = self.exec_plan.leaves
                logger.info(f"enqueue {len(leaf_tasks)} leaf tasks: {leaf_tasks[:3]} ...")
                self.try_enqueue(leaf_tasks)

            self.log_overall_progress()
            while (num_finished_tasks := self.process_finished_tasks(pool)) > 0:
                logger.info(f"processed {num_finished_tasks} finished tasks during startup")
                self.log_overall_progress()

            earlier_running_tasks = [item for item in self.running_works if isinstance(item, Task)]
            if earlier_running_tasks:
                logger.info(f"enqueue {len(earlier_running_tasks)} earlier running tasks: {earlier_running_tasks[:3]} ...")
                self.try_enqueue(earlier_running_tasks)

            self.suspend_good_executors()
            self.add_state_observer(Scheduler.StateObserver(Scheduler.log_current_status))
            self.add_state_observer(Scheduler.StateObserver(Scheduler.export_timeline_figs))
            self.notify_state_observers(force_notify=True)

            try:
                self.local_executor.start(pool)
                self.sched_loop(pool)
            finally:
                logger.info(f"schedule loop stopped")
                self.sched_running = False
                self.notify_state_observers(force_notify=True)
                self.export_task_metrics()
                self.stop_executors()

            # if --output_path is specified, remove the output root as well
            if self.remove_output_root or self.ctx.final_output_path:
                remove_path(self.ctx.staging_root)
                remove_path(self.ctx.output_root)

            if self.success:
                self.clean_temp_files(pool)
                logger.success(f"final output path: {self.exec_plan.final_output_path}")
                logger.info(f"analyzed plan:{os.linesep}{self.exec_plan.analyzed_logical_plan.explain_str()}")

        if perf_profile is not None:
            logger.debug(f"scheduler perf profile:{os.linesep}{cprofile_to_string(perf_profile)}")

        logger.info(f"scheduler of job {self.ctx.job_id} exits")
        logger.complete()
        return self.success