def clone_original_run()

in metaflow/runtime.py [0:0]


    def clone_original_run(self, generate_task_obj=False, verbose=True):
        self._logger(
            "Cloning {}/{}".format(self._flow.name, self._clone_run_id),
            system_msg=True,
        )

        inputs = []

        ubf_mapper_tasks_to_clone = set()
        ubf_control_tasks = set()
        # We only clone ubf mapper tasks if the control task is complete.
        # Here we need to check which control tasks are complete, and then get the corresponding
        # mapper tasks.
        for task_ds in self._origin_ds_set:
            _, step_name, task_id = task_ds.pathspec.split("/")
            pathspec_index = task_ds.pathspec_index
            if task_ds["_task_ok"] and step_name != "_parameters":
                # Control task contains "_control_mapper_tasks" but, in the case of
                # @parallel decorator, the control task is also a mapper task so we
                # need to distinguish this using _control_task_is_mapper_zero
                control_mapper_tasks = (
                    []
                    if "_control_mapper_tasks" not in task_ds
                    else task_ds["_control_mapper_tasks"]
                )
                if control_mapper_tasks:
                    if task_ds.get("_control_task_is_mapper_zero", False):
                        # Strip out the control task of list of mapper tasks
                        ubf_control_tasks.add(control_mapper_tasks[0])
                        ubf_mapper_tasks_to_clone.update(control_mapper_tasks[1:])
                    else:
                        ubf_mapper_tasks_to_clone.update(control_mapper_tasks)
                        # Since we only add mapper tasks here, if we are not in the list
                        # we are a control task
                        if task_ds.pathspec not in ubf_mapper_tasks_to_clone:
                            ubf_control_tasks.add(task_ds.pathspec)

        for task_ds in self._origin_ds_set:
            _, step_name, task_id = task_ds.pathspec.split("/")
            pathspec_index = task_ds.pathspec_index

            if (
                task_ds["_task_ok"]
                and step_name != "_parameters"
                and (step_name not in self._steps_to_rerun)
            ):
                # "_unbounded_foreach" is a special flag to indicate that the transition
                # is an unbounded foreach.
                # Both parent and splitted children tasks will have this flag set.
                # The splitted control/mapper tasks
                # are not foreach types because UBF is always followed by a join step.
                is_ubf_task = (
                    "_unbounded_foreach" in task_ds and task_ds["_unbounded_foreach"]
                ) and (self._graph[step_name].type != "foreach")

                is_ubf_control_task = task_ds.pathspec in ubf_control_tasks

                is_ubf_mapper_task = is_ubf_task and (not is_ubf_control_task)

                if is_ubf_mapper_task and (
                    task_ds.pathspec not in ubf_mapper_tasks_to_clone
                ):
                    # Skip copying UBF mapper tasks if control task is incomplete.
                    continue

                ubf_context = None
                if is_ubf_task:
                    ubf_context = "ubf_test" if is_ubf_mapper_task else "ubf_control"

                finished_tuple = tuple(
                    [s._replace(value=0) for s in task_ds.get("_foreach_stack", ())]
                )
                cloned_task_pathspec_index = pathspec_index.split("/")[1]
                if task_ds.get("_control_task_is_mapper_zero", False):
                    # Replace None with index 0 for control task as it is part of the
                    # UBF (as a mapper as well)
                    finished_tuple = finished_tuple[:-1] + (
                        finished_tuple[-1]._replace(index=0),
                    )
                    # We need this reverse override though because when we check
                    # if a task has been cloned in _queue_push, the index will be None
                    # because the _control_task_is_mapper_zero is set in the control
                    # task *itself* and *not* in the one that is launching the UBF nest.
                    # This means that _translate_index will use None.
                    cloned_task_pathspec_index = re.sub(
                        r"(\[(?:\d+, ?)*)0\]",
                        lambda m: (m.group(1) or "[") + "None]",
                        cloned_task_pathspec_index,
                    )

                inputs.append(
                    (
                        step_name,
                        task_id,
                        pathspec_index,
                        cloned_task_pathspec_index,
                        finished_tuple,
                        is_ubf_mapper_task,
                        ubf_context,
                    )
                )

        with futures.ThreadPoolExecutor(max_workers=self._max_workers) as executor:
            all_tasks = [
                executor.submit(
                    self.clone_task,
                    step_name,
                    task_id,
                    pathspec_index,
                    cloned_task_pathspec_index,
                    finished_tuple,
                    ubf_context=ubf_context,
                    generate_task_obj=generate_task_obj and (not is_ubf_mapper_task),
                    verbose=verbose,
                )
                for (
                    step_name,
                    task_id,
                    pathspec_index,
                    cloned_task_pathspec_index,
                    finished_tuple,
                    is_ubf_mapper_task,
                    ubf_context,
                ) in inputs
            ]
            _, _ = futures.wait(all_tasks)
        self._logger(
            "{}/{} cloned!".format(self._flow.name, self._clone_run_id), system_msg=True
        )
        self._params_task.mark_resume_done()