in metaflow/flowspec.py [0:0]
def next(self, *dsts: Callable[..., None], **kwargs) -> None:
"""
Indicates the next step to execute after this step has completed.
This statement should appear as the last statement of each step, except
the end step.
There are several valid formats to specify the next step:
- Straight-line connection: `self.next(self.next_step)` where `next_step` is a method in
the current class decorated with the `@step` decorator.
- Static fan-out connection: `self.next(self.step1, self.step2, ...)` where `stepX` are
methods in the current class decorated with the `@step` decorator.
- Foreach branch:
```
self.next(self.foreach_step, foreach='foreach_iterator')
```
In this situation, `foreach_step` is a method in the current class decorated with the
`@step` decorator and `foreach_iterator` is a variable name in the current class that
evaluates to an iterator. A task will be launched for each value in the iterator and
each task will execute the code specified by the step `foreach_step`.
Parameters
----------
dsts : Callable[..., None]
One or more methods annotated with `@step`.
Raises
------
InvalidNextException
Raised if the format of the arguments does not match one of the ones given above.
"""
step = self._current_step
foreach = kwargs.pop("foreach", None)
num_parallel = kwargs.pop("num_parallel", None)
if kwargs:
kw = next(iter(kwargs))
msg = (
"Step *{step}* passes an unknown keyword argument "
"'{invalid}' to self.next().".format(step=step, invalid=kw)
)
raise InvalidNextException(msg)
# check: next() is called only once
if self._transition is not None:
msg = (
"Multiple self.next() calls detected in step *{step}*. "
"Call self.next() only once.".format(step=step)
)
raise InvalidNextException(msg)
# check: all destinations are methods of this object
funcs = []
for i, dst in enumerate(dsts):
try:
name = dst.__func__.__name__
except:
msg = (
"In step *{step}* the {arg}. argument in self.next() is "
"not a function. Make sure all arguments in self.next() "
"are methods of the Flow class.".format(step=step, arg=i + 1)
)
raise InvalidNextException(msg)
if not hasattr(self, name):
msg = (
"Step *{step}* specifies a self.next() transition to an "
"unknown step, *{name}*.".format(step=step, name=name)
)
raise InvalidNextException(msg)
funcs.append(name)
if num_parallel is not None and num_parallel >= 1:
if len(dsts) > 1:
raise InvalidNextException(
"Only one destination allowed when num_parallel used in self.next()"
)
foreach = "_parallel_ubf_iter"
self._parallel_ubf_iter = ParallelUBF(num_parallel)
# check: foreach is valid
if foreach:
if not isinstance(foreach, basestring):
msg = (
"Step *{step}* has an invalid self.next() transition. "
"The argument to 'foreach' must be a string.".format(step=step)
)
raise InvalidNextException(msg)
if len(dsts) != 1:
msg = (
"Step *{step}* has an invalid self.next() transition. "
"Specify exactly one target for 'foreach'.".format(step=step)
)
raise InvalidNextException(msg)
try:
foreach_iter = getattr(self, foreach)
except:
msg = (
"Foreach variable *self.{var}* in step *{step}* "
"does not exist. Check your variable.".format(
step=step, var=foreach
)
)
raise InvalidNextException(msg)
self._foreach_values = None
if issubclass(type(foreach_iter), UnboundedForeachInput):
self._unbounded_foreach = True
self._foreach_num_splits = None
self._validate_ubf_step(funcs[0])
else:
try:
if INCLUDE_FOREACH_STACK:
self._foreach_values = []
for item in foreach_iter:
value = self._get_foreach_item_value(item)
self._foreach_values.append(value)
self._foreach_num_splits = len(self._foreach_values)
else:
self._foreach_num_splits = sum(1 for _ in foreach_iter)
except Exception as e:
msg = (
"Foreach variable *self.{var}* in step *{step}* "
"is not iterable. Please check details: {err}".format(
step=step, var=foreach, err=str(e)
)
)
raise InvalidNextException(msg)
if self._foreach_num_splits == 0:
msg = (
"Foreach iterator over *{var}* in step *{step}* "
"produced zero splits. Check your variable.".format(
step=step, var=foreach
)
)
raise InvalidNextException(msg)
self._foreach_var = foreach
# check: non-keyword transitions are valid
if foreach is None:
if len(dsts) < 1:
msg = (
"Step *{step}* has an invalid self.next() transition. "
"Specify at least one step function as an argument in "
"self.next().".format(step=step)
)
raise InvalidNextException(msg)
self._transition = (funcs, foreach)