def execute()

in metaflow/runtime.py [0:0]


    def execute(self):
        if len(self._cloned_tasks) > 0:
            # mutable list storing the cloned tasks.
            self._run_queue = []
            self._active_tasks[0] = 0
        else:
            if self._params_task:
                self._queue_push("start", {"input_paths": [self._params_task.path]})
            else:
                self._queue_push("start", {})
        progress_tstamp = time.time()
        with tempfile.NamedTemporaryFile(mode="w", encoding="utf-8") as config_file:
            # Configurations are passed through a file to avoid overloading the
            # command-line. We only need to create this file once and it can be reused
            # for any task launch
            config_value = dump_config_values(self._flow)
            if config_value:
                json.dump(config_value, config_file)
                config_file.flush()
                self._config_file_name = config_file.name
            else:
                self._config_file_name = None
            try:
                # main scheduling loop
                exception = None
                while (
                    self._run_queue or self._active_tasks[0] > 0 or self._cloned_tasks
                ):
                    # 1. are any of the current workers finished?
                    if self._cloned_tasks:
                        finished_tasks = self._cloned_tasks
                        # reset the list of cloned tasks and let poll_workers handle
                        # the remaining transition
                        self._cloned_tasks = []
                    else:
                        finished_tasks = list(self._poll_workers())
                    # 2. push new tasks triggered by the finished tasks to the queue
                    self._queue_tasks(finished_tasks)
                    # 3. if there are available worker slots, pop and start tasks
                    #    from the queue.
                    self._launch_workers()

                    if time.time() - progress_tstamp > PROGRESS_INTERVAL:
                        progress_tstamp = time.time()
                        tasks_print = ", ".join(
                            [
                                "%s (%d running; %d done)" % (k, v[0], v[1])
                                for k, v in self._active_tasks.items()
                                if k != 0 and v[0] > 0
                            ]
                        )
                        if self._active_tasks[0] == 0:
                            msg = "No tasks are running."
                        else:
                            if self._active_tasks[0] == 1:
                                msg = "1 task is running: "
                            else:
                                msg = "%d tasks are running: " % self._active_tasks[0]
                            msg += "%s." % tasks_print

                        self._logger(msg, system_msg=True)

                        if len(self._run_queue) == 0:
                            msg = "No tasks are waiting in the queue."
                        else:
                            if len(self._run_queue) == 1:
                                msg = "1 task is waiting in the queue: "
                            else:
                                msg = "%d tasks are waiting in the queue." % len(
                                    self._run_queue
                                )

                        self._logger(msg, system_msg=True)
                        if len(self._unprocessed_steps) > 0:
                            if len(self._unprocessed_steps) == 1:
                                msg = "%s step has not started" % (
                                    next(iter(self._unprocessed_steps)),
                                )
                            else:
                                msg = "%d steps have not started: " % len(
                                    self._unprocessed_steps
                                )
                                msg += "%s." % ", ".join(self._unprocessed_steps)
                            self._logger(msg, system_msg=True)

            except KeyboardInterrupt as ex:
                self._logger("Workflow interrupted.", system_msg=True, bad=True)
                self._killall()
                exception = ex
                raise
            except Exception as ex:
                self._logger("Workflow failed.", system_msg=True, bad=True)
                self._killall()
                exception = ex
                raise
            finally:
                # on finish clean tasks
                for step in self._flow:
                    for deco in step.decorators:
                        deco.runtime_finished(exception)

        # assert that end was executed and it was successful
        if ("end", ()) in self._finished:
            if self._run_url:
                self._logger(
                    "Done! See the run in the UI at %s" % self._run_url,
                    system_msg=True,
                )
            else:
                self._logger("Done!", system_msg=True)
        elif self._clone_only:
            self._logger(
                "Clone-only resume complete -- only previously successful steps were "
                "cloned; no new tasks executed!",
                system_msg=True,
            )
            self._params_task.mark_resume_done()
        else:
            raise MetaflowInternalError(
                "The *end* step was not successful by the end of flow."
            )