in metaflow/runtime.py [0:0]
def clone_original_run(self, generate_task_obj=False, verbose=True):
self._logger(
"Cloning {}/{}".format(self._flow.name, self._clone_run_id),
system_msg=True,
)
inputs = []
ubf_mapper_tasks_to_clone = set()
ubf_control_tasks = set()
# We only clone ubf mapper tasks if the control task is complete.
# Here we need to check which control tasks are complete, and then get the corresponding
# mapper tasks.
for task_ds in self._origin_ds_set:
_, step_name, task_id = task_ds.pathspec.split("/")
pathspec_index = task_ds.pathspec_index
if task_ds["_task_ok"] and step_name != "_parameters":
# Control task contains "_control_mapper_tasks" but, in the case of
# @parallel decorator, the control task is also a mapper task so we
# need to distinguish this using _control_task_is_mapper_zero
control_mapper_tasks = (
[]
if "_control_mapper_tasks" not in task_ds
else task_ds["_control_mapper_tasks"]
)
if control_mapper_tasks:
if task_ds.get("_control_task_is_mapper_zero", False):
# Strip out the control task of list of mapper tasks
ubf_control_tasks.add(control_mapper_tasks[0])
ubf_mapper_tasks_to_clone.update(control_mapper_tasks[1:])
else:
ubf_mapper_tasks_to_clone.update(control_mapper_tasks)
# Since we only add mapper tasks here, if we are not in the list
# we are a control task
if task_ds.pathspec not in ubf_mapper_tasks_to_clone:
ubf_control_tasks.add(task_ds.pathspec)
for task_ds in self._origin_ds_set:
_, step_name, task_id = task_ds.pathspec.split("/")
pathspec_index = task_ds.pathspec_index
if (
task_ds["_task_ok"]
and step_name != "_parameters"
and (step_name not in self._steps_to_rerun)
):
# "_unbounded_foreach" is a special flag to indicate that the transition
# is an unbounded foreach.
# Both parent and splitted children tasks will have this flag set.
# The splitted control/mapper tasks
# are not foreach types because UBF is always followed by a join step.
is_ubf_task = (
"_unbounded_foreach" in task_ds and task_ds["_unbounded_foreach"]
) and (self._graph[step_name].type != "foreach")
is_ubf_control_task = task_ds.pathspec in ubf_control_tasks
is_ubf_mapper_task = is_ubf_task and (not is_ubf_control_task)
if is_ubf_mapper_task and (
task_ds.pathspec not in ubf_mapper_tasks_to_clone
):
# Skip copying UBF mapper tasks if control task is incomplete.
continue
ubf_context = None
if is_ubf_task:
ubf_context = "ubf_test" if is_ubf_mapper_task else "ubf_control"
finished_tuple = tuple(
[s._replace(value=0) for s in task_ds.get("_foreach_stack", ())]
)
cloned_task_pathspec_index = pathspec_index.split("/")[1]
if task_ds.get("_control_task_is_mapper_zero", False):
# Replace None with index 0 for control task as it is part of the
# UBF (as a mapper as well)
finished_tuple = finished_tuple[:-1] + (
finished_tuple[-1]._replace(index=0),
)
# We need this reverse override though because when we check
# if a task has been cloned in _queue_push, the index will be None
# because the _control_task_is_mapper_zero is set in the control
# task *itself* and *not* in the one that is launching the UBF nest.
# This means that _translate_index will use None.
cloned_task_pathspec_index = re.sub(
r"(\[(?:\d+, ?)*)0\]",
lambda m: (m.group(1) or "[") + "None]",
cloned_task_pathspec_index,
)
inputs.append(
(
step_name,
task_id,
pathspec_index,
cloned_task_pathspec_index,
finished_tuple,
is_ubf_mapper_task,
ubf_context,
)
)
with futures.ThreadPoolExecutor(max_workers=self._max_workers) as executor:
all_tasks = [
executor.submit(
self.clone_task,
step_name,
task_id,
pathspec_index,
cloned_task_pathspec_index,
finished_tuple,
ubf_context=ubf_context,
generate_task_obj=generate_task_obj and (not is_ubf_mapper_task),
verbose=verbose,
)
for (
step_name,
task_id,
pathspec_index,
cloned_task_pathspec_index,
finished_tuple,
is_ubf_mapper_task,
ubf_context,
) in inputs
]
_, _ = futures.wait(all_tasks)
self._logger(
"{}/{} cloned!".format(self._flow.name, self._clone_run_id), system_msg=True
)
self._params_task.mark_resume_done()