in smallpond/execution/scheduler.py [0:0]
def update_executor_states(self):
executor_state_changed = []
for executor in self.alive_executors:
old_state = executor.state
executor_state_changed.append(executor.update_state(self.probe_epoch))
if executor.state == ExecutorState.FAIL and executor.state != old_state:
for item in executor.running_works.values():
item.status = WorkStatus.EXEC_FAILED
item.finish_time = time.time()
if isinstance(item, Task) and item.key not in self.succeeded_tasks:
logger.warning(f"reschedule {repr(item)} on failed executor: {repr(executor)}")
self.try_enqueue(self.get_retry_task(item.key))
if any(executor_state_changed):
self.clear_cached_executor_lists()
logger.info(
f"in total {len(self.available_executors)} executors: "
f"{len(self.local_executors)} local, "
f"{len(self.good_executors)} good, "
f"{len(self.failed_executors)} failed, "
f"{len(self.stopped_executors)} stopped, "
f"{len(self.stopping_executors)} stopping, "
f"{len(self.low_resource_executors)} low-resource"
)