def process_finished_tasks()

in smallpond/execution/scheduler.py [0:0]


    def process_finished_tasks(self, pool: ThreadPoolExecutor) -> int:
        pop_results = pool.map(RemoteExecutor.pop, self.available_executors.values())
        num_finished_tasks = 0

        for executor, finished_tasks in zip(self.available_executors.values(), pop_results):

            for finished_task in finished_tasks:
                assert isinstance(finished_task, Task)

                scheduled_task = self.scheduled_tasks.get(finished_task.runtime_id, None)
                if scheduled_task is None:
                    logger.info(f"task not initiated by current scheduler: {finished_task}")
                    if finished_task.status != WorkStatus.SUCCEED and (
                        missing_inputs := [key for key in finished_task.input_deps if key not in self.succeeded_tasks]
                    ):
                        logger.info(f"ignore {repr(finished_task)} since some of the input deps are missing: {missing_inputs}")
                        continue

                if finished_task.status == WorkStatus.INCOMPLETE:
                    logger.trace(f"{repr(finished_task)} checkpoint created on {executor.id}: {finished_task.runtime_state}")
                    self.tasks[finished_task.key].runtime_state = finished_task.runtime_state
                    continue

                prior_task = self.finished_tasks.get(finished_task.runtime_id, None)
                if prior_task is not None:
                    logger.info(f"found duplicate tasks, current: {repr(finished_task)}, prior: {repr(prior_task)}")
                    continue
                else:
                    self.finished_tasks[finished_task.runtime_id] = finished_task
                    num_finished_tasks += 1

                succeeded_task = self.succeeded_tasks.get(finished_task.key, None)
                if succeeded_task is not None:
                    logger.info(f"task already succeeded, current: {repr(finished_task)}, succeeded: {repr(succeeded_task)}")
                    continue

                if finished_task.status in (WorkStatus.FAILED, WorkStatus.CRASHED):
                    logger.warning(f"task failed on {executor.id}: {finished_task}, error: {finished_task.exception}")
                    finished_task.dump()

                    task = self.tasks[finished_task.key]
                    task.fail_count += 1

                    if task.fail_count > self.max_fail_count:
                        logger.critical(f"task failed too many times: {finished_task}, stopping ...")
                        self.stop_executors()
                        self.sched_running = False

                    if not executor.local and finished_task.oom(self.nonzero_exitcode_as_oom):
                        if task._memory_limit is None:
                            task._memory_limit = finished_task._memory_limit
                        self.try_relax_memory_limit(task, executor)

                    if not executor.local and self.stop_executor_on_failure:
                        logger.warning(f"stop executor: {executor}")
                        executor.stop()

                    self.try_enqueue(self.get_retry_task(finished_task.key))
                else:
                    assert finished_task.status == WorkStatus.SUCCEED, f"unexpected task status: {finished_task}"
                    logger.log(
                        "TRACE" if finished_task.exec_on_scheduler else "INFO",
                        "task succeeded on {}: {}",
                        finished_task.exec_id,
                        finished_task,
                    )

                    self.succeeded_tasks[finished_task.key] = finished_task
                    if not finished_task.exec_on_scheduler:
                        self.succeeded_nontrivial_tasks[finished_task.key] = finished_task

                    # stop the redundant retries of finished task
                    self.stop_running_tasks(finished_task.key)
                    self.save_task_final_state(finished_task)
                    self.try_enqueue(self.get_runnable_tasks(finished_task))

                    if finished_task.id == self.exec_plan.root_task.id:
                        self.sched_queue = []
                        self.stop_executors()
                        logger.success(f"all tasks completed, root task: {finished_task}")
                        logger.success(
                            f"{len(self.succeeded_tasks)} succeeded tasks, success: {self.success}, elapsed time: {self.elapsed_time:.3f} secs"
                        )

                # clear inputs since they are not needed anymore
                finished_task.input_datasets = []

        return num_finished_tasks