in smallpond/execution/executor.py [0:0]
def exec_loop(self, pool: SimplePool) -> bool:
stop_request = None
latest_probe_time = time.time()
while self.running:
# get new work items
try:
items = self.wq.pop(count=self.ctx.usable_cpu_count)
except Exception as ex:
logger.opt(exception=ex).critical(f"failed to pop from work queue: {self.wq}")
self.running = False
items = []
if not items:
secs_quiet_period = time.time() - latest_probe_time
if secs_quiet_period > self.ctx.secs_executor_probe_interval * 2 and os.path.exists(self.ctx.job_status_path):
with open(self.ctx.job_status_path) as status_file:
if (status := status_file.read().strip()) and not status.startswith("running"):
logger.critical(f"job scheduler already stopped: {status}, stopping executor")
self.running = False
break
if secs_quiet_period > self.ctx.secs_executor_probe_timeout * 2 and not pytest_running():
logger.critical(f"no probe received for {secs_quiet_period:.1f} secs, stopping executor")
self.running = False
break
# no pending works, so wait a few seconds before checking results
time.sleep(self.ctx.secs_wq_poll_interval)
for item in items:
if isinstance(item, StopExecutor):
logger.info(f"stop request received from scheduler: {item}")
stop_request = item
self.running = False
break
if isinstance(item, StopWorkItem):
running_work = self.running_works.get(item.work_to_stop, None)
if running_work is None:
logger.debug(f"cannot find {item.work_to_stop} in running works of {self.id}")
self.cq.push(item)
else:
logger.info(f"stopping work: {item.work_to_stop}")
task, _ = running_work
task.terminate()
continue
if isinstance(item, Probe):
latest_probe_time = time.time()
if item.epochs_to_skip > 0:
self.epochs_to_skip += item.epochs_to_skip
if self.epochs_to_skip > 0:
self.epochs_to_skip -= 1
continue
if self.numa_node is not None:
item._numa_node = self.numa_node
# wait and allocate GPU to work item
if item.gpu_limit > 0:
if item.gpu_limit > len(self.local_gpus):
logger.warning(
f"task {item.key} requires more GPUs than physical GPUs, downgrading from {item.gpu_limit} to {len(self.local_gpus)}"
)
item.gpu_limit = len(self.local_gpus)
# FIXME: this will block the executor if there is no available GPU
while not (granted_gpus := self.acquire_gpu(item.gpu_limit)):
logger.info(f"collecting finished works to find available GPUs")
self.collect_finished_works()
time.sleep(self.ctx.secs_wq_poll_interval)
item._local_gpu = granted_gpus
logger.info(f"{repr(item)} is assigned to run on GPU: { {gpu.id: quota for gpu, quota in item._local_gpu.items()} }")
# enqueue work item to the pool
self.running_works[item.key] = (
pool.apply_async(func=Executor.process_work, args=(item, self.cq), name=item.key),
item,
)
logger.info(f"started work: {repr(item)}, {len(self.running_works)} running works: {list(self.running_works.keys())[:10]}...")
# start to run works
pool.update_queue()
self.collect_finished_works()
pool.join(self.ctx.secs_executor_probe_interval)
if stop_request and stop_request.ack:
self.collect_finished_works()
stop_request.exec()
self.cq.push(stop_request)
return True