in smallpond/execution/scheduler.py [0:0]
def start_speculative_execution(self):
for executor in self.working_executors:
for idx, item in enumerate(executor.running_works.values()):
aggressive_retry = self.aggressive_speculative_exec and len(self.good_executors) >= self.ctx.num_executors
short_sched_queue = len(self.sched_queue) < len(self.good_executors)
if (
isinstance(item, Task)
and item.key not in self.succeeded_tasks
and item.allow_speculative_exec
and item.retry_count < self.max_retry_count
and item.retry_count == self.tasks[item.key].retry_count
and (logical_node := self.logical_nodes.get(item.node_id, None)) is not None
):
perf_stats = logical_node.get_perf_stats("elapsed wall time (secs)")
if perf_stats is not None and perf_stats.cnt >= 20:
if short_sched_queue:
retry_threshold = max(
self.ctx.secs_executor_probe_timeout,
perf_stats.p95 - perf_stats.p50,
)
elif aggressive_retry:
retry_threshold = max(
self.ctx.secs_executor_probe_timeout,
perf_stats.p99 - perf_stats.p50,
)
else:
retry_threshold = max(
self.ctx.secs_executor_probe_timeout,
perf_stats.p99 - perf_stats.p50,
) * (2 + item.retry_count)
excess_time = item.elapsed_time - perf_stats.p50
if excess_time >= retry_threshold:
logger.warning(
f"retry long-running task: {repr(item)} on {repr(executor)}, elapsed time: {item.elapsed_time:.1f} secs, elapsed time stats: {perf_stats}"
)
self.try_enqueue(self.get_retry_task(item.key))