in smallpond/execution/scheduler.py [0:0]
def probe_executors(self):
secs_since_last_executor_probe = time.time() - self.last_executor_probe_time
if secs_since_last_executor_probe >= self.ctx.secs_executor_probe_interval:
# discover new executors
with os.scandir(self.ctx.queue_root) as dir_iter:
for entry in dir_iter:
if entry.is_dir():
_, exec_id = os.path.split(entry.path)
if exec_id not in self.available_executors:
self.available_executors[exec_id] = RemoteExecutor.create(self.ctx, exec_id, entry.path, self.probe_epoch)
logger.info(f"find a new executor #{len(self.available_executors)}: {self.available_executors[exec_id]}")
self.clear_cached_executor_lists()
# start a new probe epoch
self.last_executor_probe_time = time.time()
self.probe_epoch += 1
logger.info(
f"send a new round of probes #{self.probe_epoch} to {len(self.working_executors)} working executors: {self.working_executors}"
)
for executor in self.working_executors:
executor.probe(self.probe_epoch)
# start speculative execution of tasks
if not self.disable_speculative_exec:
self.start_speculative_execution()