in smallpond/execution/scheduler.py [0:0]
def process_finished_tasks(self, pool: ThreadPoolExecutor) -> int:
pop_results = pool.map(RemoteExecutor.pop, self.available_executors.values())
num_finished_tasks = 0
for executor, finished_tasks in zip(self.available_executors.values(), pop_results):
for finished_task in finished_tasks:
assert isinstance(finished_task, Task)
scheduled_task = self.scheduled_tasks.get(finished_task.runtime_id, None)
if scheduled_task is None:
logger.info(f"task not initiated by current scheduler: {finished_task}")
if finished_task.status != WorkStatus.SUCCEED and (
missing_inputs := [key for key in finished_task.input_deps if key not in self.succeeded_tasks]
):
logger.info(f"ignore {repr(finished_task)} since some of the input deps are missing: {missing_inputs}")
continue
if finished_task.status == WorkStatus.INCOMPLETE:
logger.trace(f"{repr(finished_task)} checkpoint created on {executor.id}: {finished_task.runtime_state}")
self.tasks[finished_task.key].runtime_state = finished_task.runtime_state
continue
prior_task = self.finished_tasks.get(finished_task.runtime_id, None)
if prior_task is not None:
logger.info(f"found duplicate tasks, current: {repr(finished_task)}, prior: {repr(prior_task)}")
continue
else:
self.finished_tasks[finished_task.runtime_id] = finished_task
num_finished_tasks += 1
succeeded_task = self.succeeded_tasks.get(finished_task.key, None)
if succeeded_task is not None:
logger.info(f"task already succeeded, current: {repr(finished_task)}, succeeded: {repr(succeeded_task)}")
continue
if finished_task.status in (WorkStatus.FAILED, WorkStatus.CRASHED):
logger.warning(f"task failed on {executor.id}: {finished_task}, error: {finished_task.exception}")
finished_task.dump()
task = self.tasks[finished_task.key]
task.fail_count += 1
if task.fail_count > self.max_fail_count:
logger.critical(f"task failed too many times: {finished_task}, stopping ...")
self.stop_executors()
self.sched_running = False
if not executor.local and finished_task.oom(self.nonzero_exitcode_as_oom):
if task._memory_limit is None:
task._memory_limit = finished_task._memory_limit
self.try_relax_memory_limit(task, executor)
if not executor.local and self.stop_executor_on_failure:
logger.warning(f"stop executor: {executor}")
executor.stop()
self.try_enqueue(self.get_retry_task(finished_task.key))
else:
assert finished_task.status == WorkStatus.SUCCEED, f"unexpected task status: {finished_task}"
logger.log(
"TRACE" if finished_task.exec_on_scheduler else "INFO",
"task succeeded on {}: {}",
finished_task.exec_id,
finished_task,
)
self.succeeded_tasks[finished_task.key] = finished_task
if not finished_task.exec_on_scheduler:
self.succeeded_nontrivial_tasks[finished_task.key] = finished_task
# stop the redundant retries of finished task
self.stop_running_tasks(finished_task.key)
self.save_task_final_state(finished_task)
self.try_enqueue(self.get_runnable_tasks(finished_task))
if finished_task.id == self.exec_plan.root_task.id:
self.sched_queue = []
self.stop_executors()
logger.success(f"all tasks completed, root task: {finished_task}")
logger.success(
f"{len(self.succeeded_tasks)} succeeded tasks, success: {self.success}, elapsed time: {self.elapsed_time:.3f} secs"
)
# clear inputs since they are not needed anymore
finished_task.input_datasets = []
return num_finished_tasks