in smallpond/execution/scheduler.py [0:0]
def dispatch_tasks(self, pool: ThreadPoolExecutor):
# sort pending tasks
item_sort_key = (lambda item: (-item.retry_count, item.id)) if self.prioritize_retry else (lambda item: (item.retry_count, item.id))
items_sorted_by_node_id = sorted(self.sched_queue, key=lambda t: t.node_id)
items_group_by_node_id = itertools.groupby(items_sorted_by_node_id, key=lambda t: t.node_id)
sorted_item_groups = [sorted(items, key=item_sort_key) for _, items in items_group_by_node_id]
self.sched_queue = [item for batch in itertools.zip_longest(*sorted_item_groups, fillvalue=None) for item in batch if item is not None]
final_phase = self.num_pending_nontrivial_tasks - self.num_running_works <= len(self.good_executors) * 2
num_dispatched_tasks = 0
unassigned_tasks = []
while self.sched_queue and self.good_executors:
first_item = self.sched_queue[0]
# assign tasks to executors in round-robin fashion
usable_executors = [executor for executor in self.good_executors if not executor.busy]
for executor in sorted(usable_executors, key=lambda exec: len(exec.running_works)):
if not self.sched_queue:
break
item = self.sched_queue[0]
if item._memory_limit is None:
item._memory_limit = np.int64(executor.memory_size * item._cpu_limit // executor.cpu_count)
if item.key in self.succeeded_tasks:
logger.debug(f"task {repr(item)} already succeeded, skipping")
self.sched_queue.pop(0)
self.try_enqueue(self.get_runnable_tasks(self.succeeded_tasks[item.key]))
elif (
len(executor.running_works) < executor.max_running_works
and executor.allocated_cpus + item.cpu_limit <= executor.cpu_count
and executor.allocated_gpus + item.gpu_limit <= executor.gpu_count
and executor.allocated_memory + item.memory_limit <= executor.memory_size
and item.key not in executor.running_works
):
if final_phase:
self.try_boost_resource(item, executor)
# push to wq of executor but not flushed yet
executor.push(item, buffering=True)
logger.info(f"appended {repr(item)} ({item.cpu_limit} CPUs, {item.memory_limit/GB:.3f}GB) to the queue of {executor}")
self.sched_queue.pop(0)
num_dispatched_tasks += 1
if self.sched_queue and self.sched_queue[0] is first_item:
unassigned_tasks.append(self.sched_queue.pop(0))
# append unassigned tasks to the queue
self.sched_queue.extend(unassigned_tasks)
# flush the buffered work items into wq
assert all(pool.map(RemoteExecutor.flush, self.good_executors)), f"failed to flush work queues"
return num_dispatched_tasks