def start_speculative_execution()

in smallpond/execution/scheduler.py [0:0]


    def start_speculative_execution(self):
        for executor in self.working_executors:
            for idx, item in enumerate(executor.running_works.values()):
                aggressive_retry = self.aggressive_speculative_exec and len(self.good_executors) >= self.ctx.num_executors
                short_sched_queue = len(self.sched_queue) < len(self.good_executors)
                if (
                    isinstance(item, Task)
                    and item.key not in self.succeeded_tasks
                    and item.allow_speculative_exec
                    and item.retry_count < self.max_retry_count
                    and item.retry_count == self.tasks[item.key].retry_count
                    and (logical_node := self.logical_nodes.get(item.node_id, None)) is not None
                ):
                    perf_stats = logical_node.get_perf_stats("elapsed wall time (secs)")
                    if perf_stats is not None and perf_stats.cnt >= 20:
                        if short_sched_queue:
                            retry_threshold = max(
                                self.ctx.secs_executor_probe_timeout,
                                perf_stats.p95 - perf_stats.p50,
                            )
                        elif aggressive_retry:
                            retry_threshold = max(
                                self.ctx.secs_executor_probe_timeout,
                                perf_stats.p99 - perf_stats.p50,
                            )
                        else:
                            retry_threshold = max(
                                self.ctx.secs_executor_probe_timeout,
                                perf_stats.p99 - perf_stats.p50,
                            ) * (2 + item.retry_count)
                        excess_time = item.elapsed_time - perf_stats.p50
                        if excess_time >= retry_threshold:
                            logger.warning(
                                f"retry long-running task: {repr(item)} on {repr(executor)}, elapsed time: {item.elapsed_time:.1f} secs, elapsed time stats: {perf_stats}"
                            )
                            self.try_enqueue(self.get_retry_task(item.key))