in smallpond/execution/scheduler.py [0:0]
def sched_loop(self, pool: ThreadPoolExecutor) -> bool:
has_progress = True
do_notify = False
if self.success:
logger.success(f"job already succeeded, stopping scheduler ...")
return True
while self.sched_running:
self.probe_executors()
self.update_executor_states()
if self.local_queue:
assert self.local_executor.alive
logger.info(f"running {len(self.local_queue)} works on local executor: {self.local_queue[:3]} ...")
self.local_queue = [item for item in self.local_queue if not self.local_executor.push(item, buffering=True)]
self.local_executor.flush()
has_progress |= self.dispatch_tasks(pool) > 0
if len(self.sched_queue) == 0 and self.num_pending_nontrivial_tasks + 1 < len(self.good_executors):
for executor in self.good_executors:
if executor.idle:
logger.info(f"{len(self.good_executors)} remote executors running, stopping {executor}")
executor.stop()
break
if len(self.sched_queue) == 0 and len(self.local_queue) == 0 and self.num_running_works == 0:
self.log_overall_progress()
assert (
self.num_pending_tasks == 0
), f"scheduler stuck in idle state, but still have {self.num_pending_tasks} pending tasks: {self.tasks.keys() - self.succeeded_tasks.keys()}"
logger.info(f"no queued or running works, stopping scheduler ...")
break
if has_progress:
has_progress = False
do_notify = True
self.log_overall_progress()
else:
time.sleep(self.ctx.secs_wq_poll_interval)
if do_notify:
do_notify = not self.notify_state_observers()
has_progress |= self.process_finished_tasks(pool) > 0
# out of loop
return self.success