def exec_loop()

in smallpond/execution/executor.py [0:0]


    def exec_loop(self, pool: SimplePool) -> bool:
        stop_request = None
        latest_probe_time = time.time()

        while self.running:
            # get new work items
            try:
                items = self.wq.pop(count=self.ctx.usable_cpu_count)
            except Exception as ex:
                logger.opt(exception=ex).critical(f"failed to pop from work queue: {self.wq}")
                self.running = False
                items = []

            if not items:
                secs_quiet_period = time.time() - latest_probe_time
                if secs_quiet_period > self.ctx.secs_executor_probe_interval * 2 and os.path.exists(self.ctx.job_status_path):
                    with open(self.ctx.job_status_path) as status_file:
                        if (status := status_file.read().strip()) and not status.startswith("running"):
                            logger.critical(f"job scheduler already stopped: {status}, stopping executor")
                            self.running = False
                            break
                if secs_quiet_period > self.ctx.secs_executor_probe_timeout * 2 and not pytest_running():
                    logger.critical(f"no probe received for {secs_quiet_period:.1f} secs, stopping executor")
                    self.running = False
                    break
                # no pending works, so wait a few seconds before checking results
                time.sleep(self.ctx.secs_wq_poll_interval)

            for item in items:
                if isinstance(item, StopExecutor):
                    logger.info(f"stop request received from scheduler: {item}")
                    stop_request = item
                    self.running = False
                    break

                if isinstance(item, StopWorkItem):
                    running_work = self.running_works.get(item.work_to_stop, None)
                    if running_work is None:
                        logger.debug(f"cannot find {item.work_to_stop} in running works of {self.id}")
                        self.cq.push(item)
                    else:
                        logger.info(f"stopping work: {item.work_to_stop}")
                        task, _ = running_work
                        task.terminate()
                    continue

                if isinstance(item, Probe):
                    latest_probe_time = time.time()
                    if item.epochs_to_skip > 0:
                        self.epochs_to_skip += item.epochs_to_skip
                    if self.epochs_to_skip > 0:
                        self.epochs_to_skip -= 1
                        continue

                if self.numa_node is not None:
                    item._numa_node = self.numa_node

                # wait and allocate GPU to work item
                if item.gpu_limit > 0:
                    if item.gpu_limit > len(self.local_gpus):
                        logger.warning(
                            f"task {item.key} requires more GPUs than physical GPUs, downgrading from {item.gpu_limit} to {len(self.local_gpus)}"
                        )
                        item.gpu_limit = len(self.local_gpus)
                    # FIXME: this will block the executor if there is no available GPU
                    while not (granted_gpus := self.acquire_gpu(item.gpu_limit)):
                        logger.info(f"collecting finished works to find available GPUs")
                        self.collect_finished_works()
                        time.sleep(self.ctx.secs_wq_poll_interval)
                    item._local_gpu = granted_gpus
                    logger.info(f"{repr(item)} is assigned to run on GPU: { {gpu.id: quota for gpu, quota in item._local_gpu.items()} }")

                # enqueue work item to the pool
                self.running_works[item.key] = (
                    pool.apply_async(func=Executor.process_work, args=(item, self.cq), name=item.key),
                    item,
                )
                logger.info(f"started work: {repr(item)}, {len(self.running_works)} running works: {list(self.running_works.keys())[:10]}...")

            # start to run works
            pool.update_queue()
            self.collect_finished_works()

        pool.join(self.ctx.secs_executor_probe_interval)

        if stop_request and stop_request.ack:
            self.collect_finished_works()
            stop_request.exec()
            self.cq.push(stop_request)

        return True