def sched_loop()

in smallpond/execution/scheduler.py [0:0]


    def sched_loop(self, pool: ThreadPoolExecutor) -> bool:
        has_progress = True
        do_notify = False

        if self.success:
            logger.success(f"job already succeeded, stopping scheduler ...")
            return True

        while self.sched_running:
            self.probe_executors()
            self.update_executor_states()

            if self.local_queue:
                assert self.local_executor.alive
                logger.info(f"running {len(self.local_queue)} works on local executor: {self.local_queue[:3]} ...")
                self.local_queue = [item for item in self.local_queue if not self.local_executor.push(item, buffering=True)]
                self.local_executor.flush()

            has_progress |= self.dispatch_tasks(pool) > 0

            if len(self.sched_queue) == 0 and self.num_pending_nontrivial_tasks + 1 < len(self.good_executors):
                for executor in self.good_executors:
                    if executor.idle:
                        logger.info(f"{len(self.good_executors)} remote executors running, stopping {executor}")
                        executor.stop()
                        break

            if len(self.sched_queue) == 0 and len(self.local_queue) == 0 and self.num_running_works == 0:
                self.log_overall_progress()
                assert (
                    self.num_pending_tasks == 0
                ), f"scheduler stuck in idle state, but still have {self.num_pending_tasks} pending tasks: {self.tasks.keys() - self.succeeded_tasks.keys()}"
                logger.info(f"no queued or running works, stopping scheduler ...")
                break

            if has_progress:
                has_progress = False
                do_notify = True
                self.log_overall_progress()
            else:
                time.sleep(self.ctx.secs_wq_poll_interval)

            if do_notify:
                do_notify = not self.notify_state_observers()

            has_progress |= self.process_finished_tasks(pool) > 0

        # out of loop
        return self.success