in smallpond/execution/scheduler.py [0:0]
def pop(self) -> List[Task]:
finished_items = self.cq.pop(count=max(1, len(self.running_works)))
finished_tasks = []
for item in finished_items:
if isinstance(item, Probe):
self.last_acked_probe = max(self.last_acked_probe, item)
continue
if isinstance(item, StopWorkItem):
self.pop_running_work(item.work_to_stop)
logger.debug(f"work item stopped: {item}")
continue
if isinstance(item, StopExecutor):
self.stop_request_acked |= self.state != ExecutorState.STOPPED
logger.info(f"executor stopped: {item}")
else:
assert isinstance(item, Task), f"unexpected work item type: {item}"
item.finish_time = item.finish_time or time.time()
finished_tasks.append(item)
if item.status != WorkStatus.INCOMPLETE:
self.pop_running_work(item.key)
return finished_tasks