def dispatch_tasks()

in smallpond/execution/scheduler.py [0:0]


    def dispatch_tasks(self, pool: ThreadPoolExecutor):
        # sort pending tasks
        item_sort_key = (lambda item: (-item.retry_count, item.id)) if self.prioritize_retry else (lambda item: (item.retry_count, item.id))
        items_sorted_by_node_id = sorted(self.sched_queue, key=lambda t: t.node_id)
        items_group_by_node_id = itertools.groupby(items_sorted_by_node_id, key=lambda t: t.node_id)
        sorted_item_groups = [sorted(items, key=item_sort_key) for _, items in items_group_by_node_id]
        self.sched_queue = [item for batch in itertools.zip_longest(*sorted_item_groups, fillvalue=None) for item in batch if item is not None]

        final_phase = self.num_pending_nontrivial_tasks - self.num_running_works <= len(self.good_executors) * 2
        num_dispatched_tasks = 0
        unassigned_tasks = []

        while self.sched_queue and self.good_executors:
            first_item = self.sched_queue[0]

            # assign tasks to executors in round-robin fashion
            usable_executors = [executor for executor in self.good_executors if not executor.busy]
            for executor in sorted(usable_executors, key=lambda exec: len(exec.running_works)):
                if not self.sched_queue:
                    break
                item = self.sched_queue[0]

                if item._memory_limit is None:
                    item._memory_limit = np.int64(executor.memory_size * item._cpu_limit // executor.cpu_count)

                if item.key in self.succeeded_tasks:
                    logger.debug(f"task {repr(item)} already succeeded, skipping")
                    self.sched_queue.pop(0)
                    self.try_enqueue(self.get_runnable_tasks(self.succeeded_tasks[item.key]))
                elif (
                    len(executor.running_works) < executor.max_running_works
                    and executor.allocated_cpus + item.cpu_limit <= executor.cpu_count
                    and executor.allocated_gpus + item.gpu_limit <= executor.gpu_count
                    and executor.allocated_memory + item.memory_limit <= executor.memory_size
                    and item.key not in executor.running_works
                ):
                    if final_phase:
                        self.try_boost_resource(item, executor)
                    # push to wq of executor but not flushed yet
                    executor.push(item, buffering=True)
                    logger.info(f"appended {repr(item)} ({item.cpu_limit} CPUs, {item.memory_limit/GB:.3f}GB) to the queue of {executor}")
                    self.sched_queue.pop(0)
                    num_dispatched_tasks += 1

            if self.sched_queue and self.sched_queue[0] is first_item:
                unassigned_tasks.append(self.sched_queue.pop(0))

        # append unassigned tasks to the queue
        self.sched_queue.extend(unassigned_tasks)

        # flush the buffered work items into wq
        assert all(pool.map(RemoteExecutor.flush, self.good_executors)), f"failed to flush work queues"
        return num_dispatched_tasks