def next()

in metaflow/flowspec.py [0:0]


    def next(self, *dsts: Callable[..., None], **kwargs) -> None:
        """
        Indicates the next step to execute after this step has completed.

        This statement should appear as the last statement of each step, except
        the end step.

        There are several valid formats to specify the next step:

        - Straight-line connection: `self.next(self.next_step)` where `next_step` is a method in
          the current class decorated with the `@step` decorator.

        - Static fan-out connection: `self.next(self.step1, self.step2, ...)` where `stepX` are
          methods in the current class decorated with the `@step` decorator.

        - Foreach branch:
          ```
          self.next(self.foreach_step, foreach='foreach_iterator')
          ```
          In this situation, `foreach_step` is a method in the current class decorated with the
          `@step` decorator and `foreach_iterator` is a variable name in the current class that
          evaluates to an iterator. A task will be launched for each value in the iterator and
          each task will execute the code specified by the step `foreach_step`.

        Parameters
        ----------
        dsts : Callable[..., None]
            One or more methods annotated with `@step`.

        Raises
        ------
        InvalidNextException
            Raised if the format of the arguments does not match one of the ones given above.
        """

        step = self._current_step

        foreach = kwargs.pop("foreach", None)
        num_parallel = kwargs.pop("num_parallel", None)
        if kwargs:
            kw = next(iter(kwargs))
            msg = (
                "Step *{step}* passes an unknown keyword argument "
                "'{invalid}' to self.next().".format(step=step, invalid=kw)
            )
            raise InvalidNextException(msg)

        # check: next() is called only once
        if self._transition is not None:
            msg = (
                "Multiple self.next() calls detected in step *{step}*. "
                "Call self.next() only once.".format(step=step)
            )
            raise InvalidNextException(msg)

        # check: all destinations are methods of this object
        funcs = []
        for i, dst in enumerate(dsts):
            try:
                name = dst.__func__.__name__
            except:
                msg = (
                    "In step *{step}* the {arg}. argument in self.next() is "
                    "not a function. Make sure all arguments in self.next() "
                    "are methods of the Flow class.".format(step=step, arg=i + 1)
                )
                raise InvalidNextException(msg)
            if not hasattr(self, name):
                msg = (
                    "Step *{step}* specifies a self.next() transition to an "
                    "unknown step, *{name}*.".format(step=step, name=name)
                )
                raise InvalidNextException(msg)
            funcs.append(name)

        if num_parallel is not None and num_parallel >= 1:
            if len(dsts) > 1:
                raise InvalidNextException(
                    "Only one destination allowed when num_parallel used in self.next()"
                )
            foreach = "_parallel_ubf_iter"
            self._parallel_ubf_iter = ParallelUBF(num_parallel)

        # check: foreach is valid
        if foreach:
            if not isinstance(foreach, basestring):
                msg = (
                    "Step *{step}* has an invalid self.next() transition. "
                    "The argument to 'foreach' must be a string.".format(step=step)
                )
                raise InvalidNextException(msg)

            if len(dsts) != 1:
                msg = (
                    "Step *{step}* has an invalid self.next() transition. "
                    "Specify exactly one target for 'foreach'.".format(step=step)
                )
                raise InvalidNextException(msg)

            try:
                foreach_iter = getattr(self, foreach)
            except:
                msg = (
                    "Foreach variable *self.{var}* in step *{step}* "
                    "does not exist. Check your variable.".format(
                        step=step, var=foreach
                    )
                )
                raise InvalidNextException(msg)
            self._foreach_values = None
            if issubclass(type(foreach_iter), UnboundedForeachInput):
                self._unbounded_foreach = True
                self._foreach_num_splits = None
                self._validate_ubf_step(funcs[0])
            else:
                try:
                    if INCLUDE_FOREACH_STACK:
                        self._foreach_values = []
                        for item in foreach_iter:
                            value = self._get_foreach_item_value(item)
                            self._foreach_values.append(value)
                        self._foreach_num_splits = len(self._foreach_values)
                    else:
                        self._foreach_num_splits = sum(1 for _ in foreach_iter)
                except Exception as e:
                    msg = (
                        "Foreach variable *self.{var}* in step *{step}* "
                        "is not iterable. Please check details: {err}".format(
                            step=step, var=foreach, err=str(e)
                        )
                    )
                    raise InvalidNextException(msg)

                if self._foreach_num_splits == 0:
                    msg = (
                        "Foreach iterator over *{var}* in step *{step}* "
                        "produced zero splits. Check your variable.".format(
                            step=step, var=foreach
                        )
                    )
                    raise InvalidNextException(msg)

            self._foreach_var = foreach

        # check: non-keyword transitions are valid
        if foreach is None:
            if len(dsts) < 1:
                msg = (
                    "Step *{step}* has an invalid self.next() transition. "
                    "Specify at least one step function as an argument in "
                    "self.next().".format(step=step)
                )
                raise InvalidNextException(msg)

        self._transition = (funcs, foreach)