in metaflow/runtime.py [0:0]
def execute(self):
if len(self._cloned_tasks) > 0:
# mutable list storing the cloned tasks.
self._run_queue = []
self._active_tasks[0] = 0
else:
if self._params_task:
self._queue_push("start", {"input_paths": [self._params_task.path]})
else:
self._queue_push("start", {})
progress_tstamp = time.time()
with tempfile.NamedTemporaryFile(mode="w", encoding="utf-8") as config_file:
# Configurations are passed through a file to avoid overloading the
# command-line. We only need to create this file once and it can be reused
# for any task launch
config_value = dump_config_values(self._flow)
if config_value:
json.dump(config_value, config_file)
config_file.flush()
self._config_file_name = config_file.name
else:
self._config_file_name = None
try:
# main scheduling loop
exception = None
while (
self._run_queue or self._active_tasks[0] > 0 or self._cloned_tasks
):
# 1. are any of the current workers finished?
if self._cloned_tasks:
finished_tasks = self._cloned_tasks
# reset the list of cloned tasks and let poll_workers handle
# the remaining transition
self._cloned_tasks = []
else:
finished_tasks = list(self._poll_workers())
# 2. push new tasks triggered by the finished tasks to the queue
self._queue_tasks(finished_tasks)
# 3. if there are available worker slots, pop and start tasks
# from the queue.
self._launch_workers()
if time.time() - progress_tstamp > PROGRESS_INTERVAL:
progress_tstamp = time.time()
tasks_print = ", ".join(
[
"%s (%d running; %d done)" % (k, v[0], v[1])
for k, v in self._active_tasks.items()
if k != 0 and v[0] > 0
]
)
if self._active_tasks[0] == 0:
msg = "No tasks are running."
else:
if self._active_tasks[0] == 1:
msg = "1 task is running: "
else:
msg = "%d tasks are running: " % self._active_tasks[0]
msg += "%s." % tasks_print
self._logger(msg, system_msg=True)
if len(self._run_queue) == 0:
msg = "No tasks are waiting in the queue."
else:
if len(self._run_queue) == 1:
msg = "1 task is waiting in the queue: "
else:
msg = "%d tasks are waiting in the queue." % len(
self._run_queue
)
self._logger(msg, system_msg=True)
if len(self._unprocessed_steps) > 0:
if len(self._unprocessed_steps) == 1:
msg = "%s step has not started" % (
next(iter(self._unprocessed_steps)),
)
else:
msg = "%d steps have not started: " % len(
self._unprocessed_steps
)
msg += "%s." % ", ".join(self._unprocessed_steps)
self._logger(msg, system_msg=True)
except KeyboardInterrupt as ex:
self._logger("Workflow interrupted.", system_msg=True, bad=True)
self._killall()
exception = ex
raise
except Exception as ex:
self._logger("Workflow failed.", system_msg=True, bad=True)
self._killall()
exception = ex
raise
finally:
# on finish clean tasks
for step in self._flow:
for deco in step.decorators:
deco.runtime_finished(exception)
# assert that end was executed and it was successful
if ("end", ()) in self._finished:
if self._run_url:
self._logger(
"Done! See the run in the UI at %s" % self._run_url,
system_msg=True,
)
else:
self._logger("Done!", system_msg=True)
elif self._clone_only:
self._logger(
"Clone-only resume complete -- only previously successful steps were "
"cloned; no new tasks executed!",
system_msg=True,
)
self._params_task.mark_resume_done()
else:
raise MetaflowInternalError(
"The *end* step was not successful by the end of flow."
)