metaflow/runtime.py (1,354 lines of code) (raw):

""" Local backend Execute the flow with a native runtime using local / remote processes """ from __future__ import print_function import json import os import sys import fcntl import re import tempfile import time import subprocess from datetime import datetime from io import BytesIO from functools import partial from concurrent import futures from metaflow.datastore.exceptions import DataException from contextlib import contextmanager from . import get_namespace from .metadata_provider import MetaDatum from .metaflow_config import MAX_ATTEMPTS, UI_URL from .exception import ( MetaflowException, MetaflowInternalError, METAFLOW_EXIT_DISALLOW_RETRY, ) from . import procpoll from .datastore import TaskDataStoreSet from .debug import debug from .decorators import flow_decorators from .flowspec import _FlowState from .mflog import mflog, RUNTIME_LOG_SOURCE from .util import to_unicode, compress_list, unicode_type from .clone_util import clone_task_helper from .unbounded_foreach import ( CONTROL_TASK_TAG, UBF_CONTROL, UBF_TASK, ) from .user_configs.config_options import ConfigInput from .user_configs.config_parameters import dump_config_values import metaflow.tracing as tracing MAX_WORKERS = 16 MAX_NUM_SPLITS = 100 MAX_LOG_SIZE = 1024 * 1024 POLL_TIMEOUT = 1000 # ms PROGRESS_INTERVAL = 300 # s # The following is a list of the (data) artifacts used by the runtime while # executing a flow. These are prefetched during the resume operation by # leveraging the TaskDataStoreSet. PREFETCH_DATA_ARTIFACTS = [ "_foreach_stack", "_task_ok", "_transition", "_control_mapper_tasks", "_control_task_is_mapper_zero", ] RESUME_POLL_SECONDS = 60 # Runtime must use logsource=RUNTIME_LOG_SOURCE for all loglines that it # formats according to mflog. See a comment in mflog.__init__ mflog_msg = partial(mflog.decorate, RUNTIME_LOG_SOURCE) # TODO option: output dot graph periodically about execution class NativeRuntime(object): def __init__( self, flow, graph, flow_datastore, metadata, environment, package, logger, entrypoint, event_logger, monitor, run_id=None, clone_run_id=None, clone_only=False, reentrant=False, steps_to_rerun=None, max_workers=MAX_WORKERS, max_num_splits=MAX_NUM_SPLITS, max_log_size=MAX_LOG_SIZE, resume_identifier=None, ): if run_id is None: self._run_id = metadata.new_run_id() else: self._run_id = run_id metadata.register_run_id(run_id) self._flow = flow self._graph = graph self._flow_datastore = flow_datastore self._metadata = metadata self._environment = environment self._logger = logger self._max_workers = max_workers self._active_tasks = dict() # Key: step name; # value: [number of running tasks, number of done tasks] # Special key 0 is total number of running tasks self._active_tasks[0] = 0 self._unprocessed_steps = set([n.name for n in self._graph]) self._max_num_splits = max_num_splits self._max_log_size = max_log_size self._params_task = None self._entrypoint = entrypoint self.event_logger = event_logger self._monitor = monitor self._resume_identifier = resume_identifier self._clone_run_id = clone_run_id self._clone_only = clone_only self._cloned_tasks = [] self._ran_or_scheduled_task_index = set() self._reentrant = reentrant self._run_url = None # If steps_to_rerun is specified, we will not clone them in resume mode. self._steps_to_rerun = steps_to_rerun or {} # sorted_nodes are in topological order already, so we only need to # iterate through the nodes once to get a stable set of rerun steps. for step_name in self._graph.sorted_nodes: if step_name in self._steps_to_rerun: out_funcs = self._graph[step_name].out_funcs or [] for next_step in out_funcs: self._steps_to_rerun.add(next_step) self._origin_ds_set = None if clone_run_id: # resume logic # 0. If clone_run_id is specified, attempt to clone all the # successful tasks from the flow with `clone_run_id`. And run the # unsuccessful or not-run steps in the regular fashion. # 1. With _find_origin_task, for every task in the current run, we # find the equivalent task in `clone_run_id` using # pathspec_index=run/step:[index] and verify if this task can be # cloned. # 2. If yes, we fire off a clone-only task which copies the # metadata from the `clone_origin` to pathspec=run/step/task to # mimmick that the resumed run looks like an actual run. # 3. All steps that couldn't be cloned (either unsuccessful or not # run) are run as regular tasks. # Lastly, to improve the performance of the cloning process, we # leverage the TaskDataStoreSet abstraction to prefetch the # entire DAG of `clone_run_id` and relevant data artifacts # (see PREFETCH_DATA_ARTIFACTS) so that the entire runtime can # access the relevant data from cache (instead of going to the datastore # after the first prefetch). logger( "Gathering required information to resume run (this may take a bit of time)..." ) self._origin_ds_set = TaskDataStoreSet( flow_datastore, clone_run_id, prefetch_data_artifacts=PREFETCH_DATA_ARTIFACTS, ) self._run_queue = [] self._poll = procpoll.make_poll() self._workers = {} # fd -> subprocess mapping self._finished = {} self._is_cloned = {} # NOTE: In case of unbounded foreach, we need the following to schedule # the (sibling) mapper tasks of the control task (in case of resume); # and ensure that the join tasks runs only if all dependent tasks have # finished. self._control_num_splits = {} # control_task -> num_splits mapping for step in flow: for deco in step.decorators: deco.runtime_init(flow, graph, package, self._run_id) def _new_task(self, step, input_paths=None, **kwargs): if input_paths is None: may_clone = True else: may_clone = all(self._is_cloned[path] for path in input_paths) if step in self._steps_to_rerun: may_clone = False if step == "_parameters": decos = [] else: decos = getattr(self._flow, step).decorators return Task( self._flow_datastore, self._flow, step, self._run_id, self._metadata, self._environment, self._entrypoint, self.event_logger, self._monitor, input_paths=input_paths, may_clone=may_clone, clone_run_id=self._clone_run_id, clone_only=self._clone_only, reentrant=self._reentrant, origin_ds_set=self._origin_ds_set, decos=decos, logger=self._logger, resume_identifier=self._resume_identifier, **kwargs, ) @property def run_id(self): return self._run_id def persist_constants(self, task_id=None): self._params_task = self._new_task("_parameters", task_id=task_id) if not self._params_task.is_cloned: self._params_task.persist(self._flow) self._is_cloned[self._params_task.path] = self._params_task.is_cloned def should_skip_clone_only_execution(self): ( should_skip_clone_only_execution, skip_reason, ) = self._should_skip_clone_only_execution() if should_skip_clone_only_execution: self._logger(skip_reason, system_msg=True) return True return False @contextmanager def run_heartbeat(self): self._metadata.start_run_heartbeat(self._flow.name, self._run_id) yield self._metadata.stop_heartbeat() def print_workflow_info(self): self._run_url = ( "%s/%s/%s" % (UI_URL.rstrip("/"), self._flow.name, self._run_id) if UI_URL else None ) if self._run_url: self._logger( "Workflow starting (run-id %s), see it in the UI at %s" % ( self._run_id, self._run_url, ), system_msg=True, ) else: self._logger( "Workflow starting (run-id %s):" % self._run_id, system_msg=True ) def _should_skip_clone_only_execution(self): if self._clone_only and self._params_task: if self._params_task.resume_done(): return True, "Resume already complete. Skip clone-only execution." if not self._params_task.is_resume_leader(): return ( True, "Not resume leader under resume execution. Skip clone-only execution.", ) return False, None def clone_task( self, step_name, task_id, pathspec_index, cloned_task_pathspec_index, finished_tuple, ubf_context, generate_task_obj, verbose=False, ): try: new_task_id = task_id if generate_task_obj: task = self._new_task(step_name, pathspec_index=pathspec_index) if ubf_context: task.ubf_context = ubf_context new_task_id = task.task_id self._cloned_tasks.append(task) self._ran_or_scheduled_task_index.add(cloned_task_pathspec_index) task_pathspec = "{}/{}/{}".format(self._run_id, step_name, new_task_id) else: task_pathspec = "{}/{}/{}".format(self._run_id, step_name, new_task_id) Task.clone_pathspec_mapping[task_pathspec] = "{}/{}/{}".format( self._clone_run_id, step_name, task_id ) if verbose: self._logger( "Cloning task from {}/{}/{}/{} to {}/{}/{}/{}".format( self._flow.name, self._clone_run_id, step_name, task_id, self._flow.name, self._run_id, step_name, new_task_id, ), system_msg=True, ) clone_task_helper( self._flow.name, self._clone_run_id, self._run_id, step_name, task_id, # origin_task_id new_task_id, self._flow_datastore, self._metadata, origin_ds_set=self._origin_ds_set, ) self._finished[(step_name, finished_tuple)] = task_pathspec self._is_cloned[task_pathspec] = True except Exception as e: self._logger( "Cloning {}/{}/{}/{} failed with error: {}".format( self._flow.name, self._clone_run_id, step_name, task_id, str(e) ) ) def clone_original_run(self, generate_task_obj=False, verbose=True): self._logger( "Cloning {}/{}".format(self._flow.name, self._clone_run_id), system_msg=True, ) inputs = [] ubf_mapper_tasks_to_clone = set() ubf_control_tasks = set() # We only clone ubf mapper tasks if the control task is complete. # Here we need to check which control tasks are complete, and then get the corresponding # mapper tasks. for task_ds in self._origin_ds_set: _, step_name, task_id = task_ds.pathspec.split("/") pathspec_index = task_ds.pathspec_index if task_ds["_task_ok"] and step_name != "_parameters": # Control task contains "_control_mapper_tasks" but, in the case of # @parallel decorator, the control task is also a mapper task so we # need to distinguish this using _control_task_is_mapper_zero control_mapper_tasks = ( [] if "_control_mapper_tasks" not in task_ds else task_ds["_control_mapper_tasks"] ) if control_mapper_tasks: if task_ds.get("_control_task_is_mapper_zero", False): # Strip out the control task of list of mapper tasks ubf_control_tasks.add(control_mapper_tasks[0]) ubf_mapper_tasks_to_clone.update(control_mapper_tasks[1:]) else: ubf_mapper_tasks_to_clone.update(control_mapper_tasks) # Since we only add mapper tasks here, if we are not in the list # we are a control task if task_ds.pathspec not in ubf_mapper_tasks_to_clone: ubf_control_tasks.add(task_ds.pathspec) for task_ds in self._origin_ds_set: _, step_name, task_id = task_ds.pathspec.split("/") pathspec_index = task_ds.pathspec_index if ( task_ds["_task_ok"] and step_name != "_parameters" and (step_name not in self._steps_to_rerun) ): # "_unbounded_foreach" is a special flag to indicate that the transition # is an unbounded foreach. # Both parent and splitted children tasks will have this flag set. # The splitted control/mapper tasks # are not foreach types because UBF is always followed by a join step. is_ubf_task = ( "_unbounded_foreach" in task_ds and task_ds["_unbounded_foreach"] ) and (self._graph[step_name].type != "foreach") is_ubf_control_task = task_ds.pathspec in ubf_control_tasks is_ubf_mapper_task = is_ubf_task and (not is_ubf_control_task) if is_ubf_mapper_task and ( task_ds.pathspec not in ubf_mapper_tasks_to_clone ): # Skip copying UBF mapper tasks if control task is incomplete. continue ubf_context = None if is_ubf_task: ubf_context = "ubf_test" if is_ubf_mapper_task else "ubf_control" finished_tuple = tuple( [s._replace(value=0) for s in task_ds.get("_foreach_stack", ())] ) cloned_task_pathspec_index = pathspec_index.split("/")[1] if task_ds.get("_control_task_is_mapper_zero", False): # Replace None with index 0 for control task as it is part of the # UBF (as a mapper as well) finished_tuple = finished_tuple[:-1] + ( finished_tuple[-1]._replace(index=0), ) # We need this reverse override though because when we check # if a task has been cloned in _queue_push, the index will be None # because the _control_task_is_mapper_zero is set in the control # task *itself* and *not* in the one that is launching the UBF nest. # This means that _translate_index will use None. cloned_task_pathspec_index = re.sub( r"(\[(?:\d+, ?)*)0\]", lambda m: (m.group(1) or "[") + "None]", cloned_task_pathspec_index, ) inputs.append( ( step_name, task_id, pathspec_index, cloned_task_pathspec_index, finished_tuple, is_ubf_mapper_task, ubf_context, ) ) with futures.ThreadPoolExecutor(max_workers=self._max_workers) as executor: all_tasks = [ executor.submit( self.clone_task, step_name, task_id, pathspec_index, cloned_task_pathspec_index, finished_tuple, ubf_context=ubf_context, generate_task_obj=generate_task_obj and (not is_ubf_mapper_task), verbose=verbose, ) for ( step_name, task_id, pathspec_index, cloned_task_pathspec_index, finished_tuple, is_ubf_mapper_task, ubf_context, ) in inputs ] _, _ = futures.wait(all_tasks) self._logger( "{}/{} cloned!".format(self._flow.name, self._clone_run_id), system_msg=True ) self._params_task.mark_resume_done() def execute(self): if len(self._cloned_tasks) > 0: # mutable list storing the cloned tasks. self._run_queue = [] self._active_tasks[0] = 0 else: if self._params_task: self._queue_push("start", {"input_paths": [self._params_task.path]}) else: self._queue_push("start", {}) progress_tstamp = time.time() with tempfile.NamedTemporaryFile(mode="w", encoding="utf-8") as config_file: # Configurations are passed through a file to avoid overloading the # command-line. We only need to create this file once and it can be reused # for any task launch config_value = dump_config_values(self._flow) if config_value: json.dump(config_value, config_file) config_file.flush() self._config_file_name = config_file.name else: self._config_file_name = None try: # main scheduling loop exception = None while ( self._run_queue or self._active_tasks[0] > 0 or self._cloned_tasks ): # 1. are any of the current workers finished? if self._cloned_tasks: finished_tasks = self._cloned_tasks # reset the list of cloned tasks and let poll_workers handle # the remaining transition self._cloned_tasks = [] else: finished_tasks = list(self._poll_workers()) # 2. push new tasks triggered by the finished tasks to the queue self._queue_tasks(finished_tasks) # 3. if there are available worker slots, pop and start tasks # from the queue. self._launch_workers() if time.time() - progress_tstamp > PROGRESS_INTERVAL: progress_tstamp = time.time() tasks_print = ", ".join( [ "%s (%d running; %d done)" % (k, v[0], v[1]) for k, v in self._active_tasks.items() if k != 0 and v[0] > 0 ] ) if self._active_tasks[0] == 0: msg = "No tasks are running." else: if self._active_tasks[0] == 1: msg = "1 task is running: " else: msg = "%d tasks are running: " % self._active_tasks[0] msg += "%s." % tasks_print self._logger(msg, system_msg=True) if len(self._run_queue) == 0: msg = "No tasks are waiting in the queue." else: if len(self._run_queue) == 1: msg = "1 task is waiting in the queue: " else: msg = "%d tasks are waiting in the queue." % len( self._run_queue ) self._logger(msg, system_msg=True) if len(self._unprocessed_steps) > 0: if len(self._unprocessed_steps) == 1: msg = "%s step has not started" % ( next(iter(self._unprocessed_steps)), ) else: msg = "%d steps have not started: " % len( self._unprocessed_steps ) msg += "%s." % ", ".join(self._unprocessed_steps) self._logger(msg, system_msg=True) except KeyboardInterrupt as ex: self._logger("Workflow interrupted.", system_msg=True, bad=True) self._killall() exception = ex raise except Exception as ex: self._logger("Workflow failed.", system_msg=True, bad=True) self._killall() exception = ex raise finally: # on finish clean tasks for step in self._flow: for deco in step.decorators: deco.runtime_finished(exception) # assert that end was executed and it was successful if ("end", ()) in self._finished: if self._run_url: self._logger( "Done! See the run in the UI at %s" % self._run_url, system_msg=True, ) else: self._logger("Done!", system_msg=True) elif self._clone_only: self._logger( "Clone-only resume complete -- only previously successful steps were " "cloned; no new tasks executed!", system_msg=True, ) self._params_task.mark_resume_done() else: raise MetaflowInternalError( "The *end* step was not successful by the end of flow." ) def _killall(self): # If we are here, all children have received a signal and are shutting down. # We want to give them an opportunity to do so and then kill live_workers = set(self._workers.values()) now = int(time.time()) self._logger( "Terminating %d active tasks..." % len(live_workers), system_msg=True, bad=True, ) while live_workers and int(time.time()) - now < 5: # While not all workers are dead and we have waited less than 5 seconds live_workers = [worker for worker in live_workers if not worker.clean()] if live_workers: self._logger( "Killing %d remaining tasks after having waited for %d seconds -- " "some tasks may not exit cleanly" % (len(live_workers), int(time.time()) - now), system_msg=True, bad=True, ) for worker in live_workers: worker.kill() self._logger("Flushing logs...", system_msg=True, bad=True) # give killed workers a chance to flush their logs to datastore for _ in range(3): list(self._poll_workers()) # Given the current task information (task_index), the type of transition, # and the split index, return the new task index. def _translate_index(self, task, next_step, type, split_index=None): match = re.match(r"^(.+)\[(.*)\]$", task.task_index) if match: _, foreach_index = match.groups() # Convert foreach_index to a list of integers if len(foreach_index) > 0: foreach_index = foreach_index.split(",") else: foreach_index = [] else: raise ValueError( "Index not in the format of {run_id}/{step_name}[{foreach_index}]" ) if type == "linear": return "%s[%s]" % (next_step, ",".join(foreach_index)) elif type == "join": indices = [] if len(foreach_index) > 0: indices = foreach_index[:-1] return "%s[%s]" % (next_step, ",".join(indices)) elif type == "split": foreach_index.append(str(split_index)) return "%s[%s]" % (next_step, ",".join(foreach_index)) # Store the parameters needed for task creation, so that pushing on items # onto the run_queue is an inexpensive operation. def _queue_push(self, step, task_kwargs, index=None): # In the case of cloning, we set all the cloned tasks as the # finished tasks when pushing tasks using _queue_tasks. This means that we # could potentially try to push the same task multiple times (for example # if multiple parents of a join are cloned). We therefore keep track of what # has executed (been cloned) or what has been scheduled and avoid scheduling # it again. if index: if index in self._ran_or_scheduled_task_index: # It has already run or been scheduled return # Note that we are scheduling this to run self._ran_or_scheduled_task_index.add(index) self._run_queue.insert(0, (step, task_kwargs)) # For foreaches, this will happen multiple time but is ok, becomes a no-op self._unprocessed_steps.discard(step) def _queue_pop(self): return self._run_queue.pop() if self._run_queue else (None, {}) def _queue_task_join(self, task, next_steps): # if the next step is a join, we need to check that # all input tasks for the join have finished before queuing it. # CHECK: this condition should be enforced by the linter but # let's assert that the assumption holds if len(next_steps) > 1: msg = ( "Step *{step}* transitions to a join and another " "step. The join must be the only transition." ) raise MetaflowInternalError(task, msg.format(step=task.step)) else: next_step = next_steps[0] unbounded_foreach = not task.results.is_none("_unbounded_foreach") if unbounded_foreach: # Before we queue the join, do some post-processing of runtime state # (_finished, _is_cloned) for the (sibling) mapper tasks. # Update state of (sibling) mapper tasks for control task. if task.ubf_context == UBF_CONTROL: mapper_tasks = task.results.get("_control_mapper_tasks") if not mapper_tasks: msg = ( "Step *{step}* has a control task which didn't " "specify the artifact *_control_mapper_tasks* for " "the subsequent *{join}* step." ) raise MetaflowInternalError( msg.format(step=task.step, join=next_steps[0]) ) elif not ( isinstance(mapper_tasks, list) and isinstance(mapper_tasks[0], unicode_type) ): msg = ( "Step *{step}* has a control task which didn't " "specify the artifact *_control_mapper_tasks* as a " "list of strings but instead specified it as {typ} " "with elements of {elem_typ}." ) raise MetaflowInternalError( msg.format( step=task.step, typ=type(mapper_tasks), elem_type=type(mapper_tasks[0]), ) ) num_splits = len(mapper_tasks) self._control_num_splits[task.path] = num_splits # If the control task is cloned, all mapper tasks should have been cloned # as well, so we no longer need to handle cloning of mapper tasks in runtime. # Update _finished if we are not cloned. If we were cloned, we already # updated _finished with the new tasks. Note that the *value* of mapper # tasks is incorrect and contains the pathspec of the *cloned* run # but we don't use it for anything. We could look to clean it up though if not task.is_cloned: _, foreach_stack = task.finished_id top = foreach_stack[-1] bottom = list(foreach_stack[:-1]) for i in range(num_splits): s = tuple(bottom + [top._replace(index=i)]) self._finished[(task.step, s)] = mapper_tasks[i] self._is_cloned[mapper_tasks[i]] = False # Find and check status of control task and retrieve its pathspec # for retrieving unbounded foreach cardinality. _, foreach_stack = task.finished_id top = foreach_stack[-1] bottom = list(foreach_stack[:-1]) s = tuple(bottom + [top._replace(index=None)]) # UBF control can also be the first task of the list. Then # it will have index=0 instead of index=None. if task.results.get("_control_task_is_mapper_zero", False): s = tuple(bottom + [top._replace(index=0)]) control_path = self._finished.get((task.step, s)) if control_path: # Control task was successful. # Additionally check the state of (sibling) mapper tasks as well # (for the sake of resume) before queueing join task. num_splits = self._control_num_splits[control_path] required_tasks = [] for i in range(num_splits): s = tuple(bottom + [top._replace(index=i)]) required_tasks.append(self._finished.get((task.step, s))) if all(required_tasks): index = self._translate_index(task, next_step, "join") # all tasks to be joined are ready. Schedule the next join step. self._queue_push( next_step, {"input_paths": required_tasks, "join_type": "foreach"}, index, ) else: # matching_split is the split-parent of the finished task matching_split = self._graph[self._graph[next_step].split_parents[-1]] _, foreach_stack = task.finished_id index = "" if matching_split.type == "foreach": # next step is a foreach join def siblings(foreach_stack): top = foreach_stack[-1] bottom = list(foreach_stack[:-1]) for index in range(top.num_splits): yield tuple(bottom + [top._replace(index=index)]) # required tasks are all split-siblings of the finished task required_tasks = [ self._finished.get((task.step, s)) for s in siblings(foreach_stack) ] join_type = "foreach" index = self._translate_index(task, next_step, "join") else: # next step is a split # required tasks are all branches joined by the next step required_tasks = [ self._finished.get((step, foreach_stack)) for step in self._graph[next_step].in_funcs ] join_type = "linear" index = self._translate_index(task, next_step, "linear") if all(required_tasks): # all tasks to be joined are ready. Schedule the next join step. self._queue_push( next_step, {"input_paths": required_tasks, "join_type": join_type}, index, ) def _queue_task_foreach(self, task, next_steps): # CHECK: this condition should be enforced by the linter but # let's assert that the assumption holds if len(next_steps) > 1: msg = ( "Step *{step}* makes a foreach split but it defines " "multiple transitions. Specify only one transition " "for foreach." ) raise MetaflowInternalError(msg.format(step=task.step)) else: next_step = next_steps[0] unbounded_foreach = not task.results.is_none("_unbounded_foreach") if unbounded_foreach: # Need to push control process related task. ubf_iter_name = task.results.get("_foreach_var") ubf_iter = task.results.get(ubf_iter_name) # UBF control task has no split index, hence "None" as place holder. if task.results.get("_control_task_is_mapper_zero", False): index = self._translate_index(task, next_step, "split", 0) else: index = self._translate_index(task, next_step, "split", None) self._queue_push( next_step, { "input_paths": [task.path], "ubf_context": UBF_CONTROL, "ubf_iter": ubf_iter, }, index, ) else: num_splits = task.results["_foreach_num_splits"] if num_splits > self._max_num_splits: msg = ( "Foreach in step *{step}* yielded {num} child steps " "which is more than the current maximum of {max} " "children. You can raise the maximum with the " "--max-num-splits option. " ) raise TaskFailed( task, msg.format( step=task.step, num=num_splits, max=self._max_num_splits ), ) # schedule all splits for i in range(num_splits): index = self._translate_index(task, next_step, "split", i) self._queue_push( next_step, {"split_index": str(i), "input_paths": [task.path]}, index, ) def _queue_tasks(self, finished_tasks): # finished tasks include only successful tasks for task in finished_tasks: self._finished[task.finished_id] = task.path self._is_cloned[task.path] = task.is_cloned # CHECK: ensure that runtime transitions match with # statically inferred transitions. Make an exception for control # tasks, where we just rely on static analysis since we don't # execute user code. trans = task.results.get("_transition") if trans: next_steps = trans[0] foreach = trans[1] else: next_steps = [] foreach = None expected = self._graph[task.step].out_funcs if next_steps != expected: msg = ( "Based on static analysis of the code, step *{step}* " "was expected to transition to step(s) *{expected}*. " "However, when the code was executed, self.next() was " "called with *{actual}*. Make sure there is only one " "unconditional self.next() call in the end of your " "step. " ) raise MetaflowInternalError( msg.format( step=task.step, expected=", ".join(expected), actual=", ".join(next_steps), ) ) # Different transition types require different treatment if any(self._graph[f].type == "join" for f in next_steps): # Next step is a join self._queue_task_join(task, next_steps) elif foreach: # Next step is a foreach child self._queue_task_foreach(task, next_steps) else: # Next steps are normal linear steps for step in next_steps: index = self._translate_index(task, step, "linear") self._queue_push(step, {"input_paths": [task.path]}, index) def _poll_workers(self): if self._workers: for event in self._poll.poll(POLL_TIMEOUT): worker = self._workers.get(event.fd) if worker: if event.can_read: worker.read_logline(event.fd) if event.is_terminated: returncode = worker.terminate() for fd in worker.fds(): self._poll.remove(fd) del self._workers[fd] step_counts = self._active_tasks[worker.task.step] step_counts[0] -= 1 # One less task for this step is running step_counts[1] += 1 # ... and one more completed. # We never remove from self._active_tasks because it is possible # for all currently running task for a step to complete but # for others to still be queued up. self._active_tasks[0] -= 1 task = worker.task if returncode: # worker did not finish successfully if ( worker.cleaned or returncode == METAFLOW_EXIT_DISALLOW_RETRY ): self._logger( "This failed task will not be retried.", system_msg=True, ) else: if ( task.retries < task.user_code_retries + task.error_retries ): self._retry_worker(worker) else: raise TaskFailed(task) else: # worker finished successfully yield task def _launch_workers(self): while self._run_queue and self._active_tasks[0] < self._max_workers: step, task_kwargs = self._queue_pop() # Initialize the task (which can be expensive using remote datastores) # before launching the worker so that cost is amortized over time, instead # of doing it during _queue_push. task = self._new_task(step, **task_kwargs) self._launch_worker(task) def _retry_worker(self, worker): worker.task.retries += 1 if worker.task.retries >= MAX_ATTEMPTS: # any results with an attempt ID >= MAX_ATTEMPTS will be ignored # by datastore, so running a task with such a retry_could would # be pointless and dangerous raise MetaflowInternalError( "Too many task attempts (%d)! " "MAX_ATTEMPTS exceeded." % worker.task.retries ) worker.task.new_attempt() self._launch_worker(worker.task) def _launch_worker(self, task): if self._clone_only and not task.is_cloned: # We don't launch a worker here self._logger( "Not executing non-cloned task for step '%s' in clone-only resume" % "/".join([task.flow_name, task.run_id, task.step]), system_msg=True, ) return worker = Worker(task, self._max_log_size, self._config_file_name) for fd in worker.fds(): self._workers[fd] = worker self._poll.add(fd) active_step_counts = self._active_tasks.setdefault(task.step, [0, 0]) # We have an additional task for this step running active_step_counts[0] += 1 # One more task actively running self._active_tasks[0] += 1 class Task(object): clone_pathspec_mapping = {} def __init__( self, flow_datastore, flow, step, run_id, metadata, environment, entrypoint, event_logger, monitor, input_paths=None, may_clone=False, clone_run_id=None, clone_only=False, reentrant=False, origin_ds_set=None, decos=None, logger=None, # Anything below this is passed as part of kwargs split_index=None, ubf_context=None, ubf_iter=None, join_type=None, task_id=None, resume_identifier=None, pathspec_index=None, ): self.step = step self.flow = flow self.flow_name = flow.name self.run_id = run_id self.task_id = None self._path = None self.input_paths = input_paths self.split_index = split_index self.ubf_context = ubf_context self.ubf_iter = ubf_iter self.decos = [] if decos is None else decos self.entrypoint = entrypoint self.environment = environment self.environment_type = self.environment.TYPE self.clone_run_id = clone_run_id self.clone_origin = None self.origin_ds_set = origin_ds_set self.metadata = metadata self.event_logger = event_logger self.monitor = monitor self._logger = logger self.retries = 0 self.user_code_retries = 0 self.error_retries = 0 self.tags = metadata.sticky_tags self.event_logger_type = self.event_logger.TYPE self.monitor_type = monitor.TYPE self.metadata_type = metadata.TYPE self.datastore_type = flow_datastore.TYPE self._flow_datastore = flow_datastore self.datastore_sysroot = flow_datastore.datastore_root self._results_ds = None # Only used in clone-only resume. self._is_resume_leader = None self._resume_done = None self._resume_identifier = resume_identifier origin = None if clone_run_id and may_clone: origin = self._find_origin_task(clone_run_id, join_type, pathspec_index) if origin and origin["_task_ok"]: # At this point, we know we are going to clone self._is_cloned = True task_id_exists_already = False task_completed = False if reentrant: # A re-entrant clone basically allows multiple concurrent processes # to perform the clone at the same time to the same new run id. Let's # assume two processes A and B both simultaneously calling # `resume --reentrant --run-id XX`. # We want to guarantee that: # - All incomplete tasks are cloned exactly once. # To achieve this, we will select a resume leader and let it clone the # entire execution graph. This ensures that we only write once to the # datastore and metadata. # # We use the cloned _parameter task's task-id as the "key" to synchronize # on. We try to "register" this new task-id (or rather the full pathspec # <run>/<step>/<taskid>) with the metadata service which will indicate # if we actually registered it or if it existed already. If we did manage # to register it (_parameter task), we are the "elected resume leader" # in essence and proceed to clone. If we didn't, we just wait to make # sure the entire clone execution is fully done (ie: the clone is finished). if task_id is not None: # Sanity check -- this should never happen. We cannot allow # for explicit task-ids because in the reentrant case, we use the # cloned task's id as the new task's id. raise MetaflowInternalError( "Reentrant clone-only resume does not allow for explicit task-id" ) if resume_identifier: self.log( "Resume identifier is %s." % resume_identifier, system_msg=True, ) else: raise MetaflowInternalError( "Reentrant clone-only resume needs a resume identifier." ) # We will use the same task_id as the original task # to use it effectively as a synchronization key clone_task_id = origin.task_id # Make sure the task-id is a non-integer to not clash with task ids # assigned by the metadata provider. If this is an integer, we # add some string to it. It doesn't matter what we add as long as it is # consistent. try: clone_task_int = int(clone_task_id) clone_task_id = "resume-%d" % clone_task_int except ValueError: pass # If _get_task_id returns True it means the task already existed, so # we wait for it. task_id_exists_already = self._get_task_id(clone_task_id) # We may not have access to task datastore on first resume attempt, but # on later resume attempt, we should check if the resume task is complete # or not. This is to fix the issue where the resume leader was killed # unexpectedly during cloning and never mark task complete. try: task_completed = self.results["_task_ok"] except DataException as e: pass else: self._get_task_id(task_id) # Store the mapping from current_pathspec -> origin_pathspec which # will be useful for looking up origin_ds_set in find_origin_task. self.clone_pathspec_mapping[self._path] = origin.pathspec if self.step == "_parameters": # In the _parameters task, we need to resolve who is the resume leader. self._is_resume_leader = False resume_leader = None if task_id_exists_already: # If the task id already exists, we need to check if current task is the resume leader in previous attempt. ds = self._flow_datastore.get_task_datastore( self.run_id, self.step, self.task_id ) if not ds["_task_ok"]: raise MetaflowInternalError( "Externally cloned _parameters task did not succeed" ) # Check if we should be the resume leader (maybe from previous attempt). # To avoid the edge case where the resume leader is selected but has not # yet written the _resume_leader metadata, we will wait for a few seconds. # We will wait for resume leader for at most 3 times. for _ in range(3): if ds.has_metadata("_resume_leader", add_attempt=False): resume_leader = ds.load_metadata( ["_resume_leader"], add_attempt=False )["_resume_leader"] self._is_resume_leader = resume_leader == resume_identifier else: self.log( "Waiting for resume leader to be selected. Sleeping ...", system_msg=True, ) time.sleep(3) else: # If the task id does not exist, current task is the resume leader. resume_leader = resume_identifier self._is_resume_leader = True if reentrant: if resume_leader: self.log( "Resume leader is %s." % resume_leader, system_msg=True, ) else: raise MetaflowInternalError( "Can not determine the resume leader in distributed resume mode." ) if self._is_resume_leader: if reentrant: self.log( "Selected as the reentrant clone leader.", system_msg=True, ) # Clone in place without relying on run_queue. self.new_attempt() self._ds.clone(origin) # Set the resume leader be the task that calls the resume (first task to clone _parameters task). # We will always set resume leader regardless whether we are in distributed resume case or not. if resume_identifier: self._ds.save_metadata( {"_resume_leader": resume_identifier}, add_attempt=False ) self._ds.done() else: # Wait for the resume leader to complete while True: ds = self._flow_datastore.get_task_datastore( self.run_id, self.step, self.task_id ) # Check if resume is complete. Resume leader will write the done file. self._resume_done = ds.has_metadata( "_resume_done", add_attempt=False ) if self._resume_done: break self.log( "Waiting for resume leader to complete. Sleeping for %ds..." % RESUME_POLL_SECONDS, system_msg=True, ) time.sleep(RESUME_POLL_SECONDS) self.log( "_parameters clone completed by resume leader", system_msg=True ) else: # Only leader can reach non-parameter steps in resume. # Store the origin pathspec in clone_origin so this can be run # as a task by the runtime. self.clone_origin = origin.pathspec # Save a call to creating the results_ds since its same as origin. self._results_ds = origin # If the task is already completed in new run, we don't need to clone it. self._should_skip_cloning = task_completed if self._should_skip_cloning: self.log( "Skipping cloning of previously run task %s" % self.clone_origin, system_msg=True, ) else: self.log( "Cloning previously run task %s" % self.clone_origin, system_msg=True, ) else: self._is_cloned = False if clone_only: # We are done -- we don't proceed to create new task-ids return self._get_task_id(task_id) # Open the output datastore only if the task is not being cloned. if not self._is_cloned: self.new_attempt() for deco in decos: deco.runtime_task_created( self._ds, task_id, split_index, input_paths, self._is_cloned, ubf_context, ) # determine the number of retries of this task user_code_retries, error_retries = deco.step_task_retry_count() if user_code_retries is None and error_retries is None: # This signals the runtime that the task doesn't want any # retries indifferent to other decorator opinions. # NOTE: This is needed since we don't statically disallow # specifying `@retry` in combination with decorators which # implement `unbounded_foreach` semantics. This allows for # ergonomic user invocation of `--with retry`; instead # choosing to specially handle this way in the runtime. self.user_code_retries = None self.error_retries = None if ( self.user_code_retries is not None and self.error_retries is not None ): self.user_code_retries = max( self.user_code_retries, user_code_retries ) self.error_retries = max(self.error_retries, error_retries) if self.user_code_retries is None and self.error_retries is None: self.user_code_retries = 0 self.error_retries = 0 def new_attempt(self): self._ds = self._flow_datastore.get_task_datastore( self.run_id, self.step, self.task_id, attempt=self.retries, mode="w" ) self._ds.init_task() def log(self, msg, system_msg=False, pid=None, timestamp=True): if pid: prefix = "[%s (pid %s)] " % (self._path, pid) else: prefix = "[%s] " % self._path self._logger(msg, head=prefix, system_msg=system_msg, timestamp=timestamp) sys.stdout.flush() def is_resume_leader(self): assert ( self.step == "_parameters" ), "Only _parameters step can check resume leader." return self._is_resume_leader def resume_done(self): assert ( self.step == "_parameters" ), "Only _parameters step can check wheather resume is complete." return self._resume_done def mark_resume_done(self): assert ( self.step == "_parameters" ), "Only _parameters step can mark resume as done." assert self.is_resume_leader(), "Only resume leader can mark resume as done." # Mark the resume as done. This is called at the end of the resume flow and after # the _parameters step was successfully cloned, so we need to 'dangerously' save # this done file, but the risk should be minimal. self._ds._dangerous_save_metadata_post_done( {"_resume_done": True}, add_attempt=False ) def _get_task_id(self, task_id): already_existed = True tags = [] if self.ubf_context == UBF_CONTROL: tags = [CONTROL_TASK_TAG] # Register Metaflow tasks. if task_id is None: task_id = str( self.metadata.new_task_id(self.run_id, self.step, sys_tags=tags) ) already_existed = False else: # task_id is preset only by persist_constants(). already_existed = not self.metadata.register_task_id( self.run_id, self.step, task_id, 0, sys_tags=tags, ) self.task_id = task_id self._path = "%s/%s/%s" % (self.run_id, self.step, self.task_id) return already_existed def _find_origin_task(self, clone_run_id, join_type, pathspec_index=None): if pathspec_index: origin = self.origin_ds_set.get_with_pathspec_index(pathspec_index) return origin elif self.step == "_parameters": pathspec = "%s/_parameters[]" % clone_run_id origin = self.origin_ds_set.get_with_pathspec_index(pathspec) if origin is None: # This is just for usability: We could rerun the whole flow # if an unknown clone_run_id is provided but probably this is # not what the user intended, so raise a warning raise MetaflowException( "Resume could not find run id *%s*" % clone_run_id ) else: return origin else: # all inputs must have the same foreach stack, so we can safely # pick the first one parent_pathspec = self.input_paths[0] origin_parent_pathspec = self.clone_pathspec_mapping[parent_pathspec] parent = self.origin_ds_set.get_with_pathspec(origin_parent_pathspec) # Parent should be non-None since only clone the child if the parent # was successfully cloned. foreach_stack = parent["_foreach_stack"] if join_type == "foreach": # foreach-join pops the topmost index index = ",".join(str(s.index) for s in foreach_stack[:-1]) elif self.split_index or self.ubf_context == UBF_CONTROL: # foreach-split pushes a new index index = ",".join( [str(s.index) for s in foreach_stack] + [str(self.split_index)] ) else: # all other transitions keep the parent's foreach stack intact index = ",".join(str(s.index) for s in foreach_stack) pathspec = "%s/%s[%s]" % (clone_run_id, self.step, index) return self.origin_ds_set.get_with_pathspec_index(pathspec) @property def path(self): return self._path @property def results(self): if self._results_ds: return self._results_ds else: self._results_ds = self._flow_datastore.get_task_datastore( self.run_id, self.step, self.task_id ) return self._results_ds @property def task_index(self): _, task_index = self.results.pathspec_index.split("/") return task_index @property def finished_id(self): # note: id is not available before the task has finished. # Index already identifies the task within the foreach, # we will remove foreach value so that it is easier to # identify siblings within a foreach. foreach_stack_tuple = tuple( [s._replace(value=0) for s in self.results["_foreach_stack"]] ) return (self.step, foreach_stack_tuple) @property def is_cloned(self): return self._is_cloned @property def should_skip_cloning(self): return self._should_skip_cloning def persist(self, flow): # this is used to persist parameters before the start step flow._task_ok = flow._success = True flow._foreach_stack = [] self._ds.persist(flow) self._ds.done() def save_logs(self, logtype_to_logs): self._ds.save_logs(RUNTIME_LOG_SOURCE, logtype_to_logs) def save_metadata(self, name, metadata): self._ds.save_metadata({name: metadata}) def __str__(self): return " ".join(self._args) class TaskFailed(MetaflowException): headline = "Step failure" def __init__(self, task, msg=""): body = "Step *%s* (task-id %s) failed" % (task.step, task.task_id) if msg: body = "%s: %s" % (body, msg) else: body += "." super(TaskFailed, self).__init__(body) class TruncatedBuffer(object): def __init__(self, name, maxsize): self.name = name self._maxsize = maxsize self._buffer = BytesIO() self._size = 0 self._eof = False def write(self, bytedata, system_msg=False): if system_msg: self._buffer.write(bytedata) elif not self._eof: if self._size + len(bytedata) < self._maxsize: self._buffer.write(bytedata) self._size += len(bytedata) else: msg = b"[TRUNCATED - MAXIMUM LOG FILE SIZE REACHED]\n" self._buffer.write(mflog_msg(msg)) self._eof = True def get_bytes(self): return self._buffer.getvalue() def get_buffer(self): self._buffer.seek(0) return self._buffer class CLIArgs(object): """ Container to allow decorators modify the command line parameters for step execution in StepDecorator.runtime_step_cli(). """ def __init__(self, task): self.task = task self.entrypoint = list(task.entrypoint) self.top_level_options = { "quiet": True, "metadata": self.task.metadata_type, "environment": self.task.environment_type, "datastore": self.task.datastore_type, "pylint": False, "event-logger": self.task.event_logger_type, "monitor": self.task.monitor_type, "datastore-root": self.task.datastore_sysroot, "with": [ deco.make_decorator_spec() for deco in self.task.decos if not deco.statically_defined ], } # FlowDecorators can define their own top-level options. They are # responsible for adding their own top-level options and values through # the get_top_level_options() hook. for deco in flow_decorators(self.task.flow): self.top_level_options.update(deco.get_top_level_options()) # We also pass configuration options using the kv.<name> syntax which will cause # the configuration options to be loaded from the CONFIG file (or local-config-file # in the case of the local runtime) configs = self.task.flow._flow_state.get(_FlowState.CONFIGS) if configs: self.top_level_options["config-value"] = [ (k, ConfigInput.make_key_name(k)) for k in configs ] self.commands = ["step"] self.command_args = [self.task.step] self.command_options = { "run-id": task.run_id, "task-id": task.task_id, "input-paths": compress_list(task.input_paths), "split-index": task.split_index, "retry-count": task.retries, "max-user-code-retries": task.user_code_retries, "tag": task.tags, "namespace": get_namespace() or "", "ubf-context": task.ubf_context, } self.env = {} def get_args(self): # TODO: Make one with dict_to_cli_options; see cli_args.py for more detail def _options(mapping): for k, v in mapping.items(): # None or False arguments are ignored # v needs to be explicitly False, not falsy, e.g. 0 is an acceptable value if v is None or v is False: continue # we need special handling for 'with' since it is a reserved # keyword in Python, so we call it 'decospecs' in click args if k == "decospecs": k = "with" k = k.replace("_", "-") v = v if isinstance(v, (list, tuple, set)) else [v] for value in v: yield "--%s" % k if not isinstance(value, bool): value = value if isinstance(value, tuple) else (value,) for vv in value: yield to_unicode(vv) args = list(self.entrypoint) args.extend(_options(self.top_level_options)) args.extend(self.commands) args.extend(self.command_args) args.extend(_options(self.command_options)) return args def get_env(self): return self.env def __str__(self): return " ".join(self.get_args()) class Worker(object): def __init__(self, task, max_logs_size, config_file_name): self.task = task self._config_file_name = config_file_name self._proc = self._launch() if task.retries > task.user_code_retries: self.task.log( "Task fallback is starting to handle the failure.", system_msg=True, pid=self._proc.pid, ) elif not task.is_cloned: suffix = " (retry)." if task.retries else "." self.task.log( "Task is starting" + suffix, system_msg=True, pid=self._proc.pid ) self._stdout = TruncatedBuffer("stdout", max_logs_size) self._stderr = TruncatedBuffer("stderr", max_logs_size) self._logs = { self._proc.stderr.fileno(): (self._proc.stderr, self._stderr), self._proc.stdout.fileno(): (self._proc.stdout, self._stdout), } self._encoding = sys.stdout.encoding or "UTF-8" self.killed = False # Killed indicates that the task was forcibly killed # with SIGKILL by the master process. # A killed task is always considered cleaned self.cleaned = False # A cleaned task is one that is shutting down and has been # noticed by the runtime and queried for its state (whether or # not it is properly shut down) def _launch(self): args = CLIArgs(self.task) env = dict(os.environ) if self.task.clone_run_id: args.command_options["clone-run-id"] = self.task.clone_run_id if self.task.is_cloned and self.task.clone_origin: args.command_options["clone-only"] = self.task.clone_origin # disabling sidecars for cloned tasks due to perf reasons args.top_level_options["event-logger"] = "nullSidecarLogger" args.top_level_options["monitor"] = "nullSidecarMonitor" else: # decorators may modify the CLIArgs object in-place for deco in self.task.decos: deco.runtime_step_cli( args, self.task.retries, self.task.user_code_retries, self.task.ubf_context, ) # Add user configurations using a file to avoid using up too much space on the # command line if self._config_file_name: args.top_level_options["local-config-file"] = self._config_file_name # Pass configuration options env.update(args.get_env()) env["PYTHONUNBUFFERED"] = "x" tracing.inject_tracing_vars(env) # NOTE bufsize=1 below enables line buffering which is required # by read_logline() below that relies on readline() not blocking # print('running', args) cmdline = args.get_args() debug.subcommand_exec(cmdline) return subprocess.Popen( cmdline, env=env, bufsize=1, stdin=subprocess.PIPE, stderr=subprocess.PIPE, stdout=subprocess.PIPE, ) def emit_log(self, msg, buf, system_msg=False): if mflog.is_structured(msg): res = mflog.parse(msg) if res: # parsing successful plain = res.msg timestamp = res.utc_tstamp if res.should_persist: # in special circumstances we may receive structured # loglines that haven't been properly persisted upstream. # This is the case if, for example, a task crashes in an external # system and we retrieve the remaining logs after the crash. # Those lines are marked with a special tag, should_persist, # which we process here buf.write(mflog.unset_should_persist(msg)) else: # parsing failed, corrupted logline. Print it as-is timestamp = datetime.utcnow() plain = msg else: # If the line isn't formatted with mflog already, we format it here. plain = msg timestamp = datetime.utcnow() # store unformatted loglines in the buffer that will be # persisted, assuming that all previously formatted loglines have # been already persisted at the source. buf.write(mflog_msg(msg, now=timestamp), system_msg=system_msg) text = plain.strip().decode(self._encoding, errors="replace") self.task.log( text, pid=self._proc.pid, timestamp=mflog.utc_to_local(timestamp), system_msg=system_msg, ) def read_logline(self, fd): fileobj, buf = self._logs[fd] # readline() below should never block thanks to polling and # line buffering. If it does, things will deadlock line = fileobj.readline() if line: self.emit_log(line, buf) return True else: return False def fds(self): return (self._proc.stderr.fileno(), self._proc.stdout.fileno()) def clean(self): if self.killed: return True if not self.cleaned: for fileobj, buf in self._logs.values(): msg = b"[KILLED BY ORCHESTRATOR]\n" self.emit_log(msg, buf, system_msg=True) self.cleaned = True return self._proc.poll() is not None def kill(self): if not self.killed: try: self._proc.kill() except: pass self.cleaned = True self.killed = True def terminate(self): # this shouldn't block, since terminate() is called only # after the poller has decided that the worker is dead returncode = self._proc.wait() # consume all remaining loglines # we set the file descriptor to be non-blocking, since # the pipe may stay active due to subprocesses launched by # the worker, e.g. sidecars, so we can't rely on EOF. We try to # read just what's available in the pipe buffer for fileobj, buf in self._logs.values(): fileno = fileobj.fileno() fcntl.fcntl(fileno, fcntl.F_SETFL, os.O_NONBLOCK) try: while self.read_logline(fileno): pass except: # ignore "resource temporarily unavailable" etc. errors # caused due to non-blocking. Draining is done on a # best-effort basis. pass # Return early if the task is cloned since we don't want to # perform any log collection. if not self.task.is_cloned: self.task.save_metadata( "runtime", { "return_code": returncode, "killed": self.killed, "success": returncode == 0, }, ) if returncode: if not self.killed: if returncode == -11: self.emit_log( b"Task failed with a segmentation fault.", self._stderr, system_msg=True, ) else: self.emit_log(b"Task failed.", self._stderr, system_msg=True) else: num = self.task.results["_foreach_num_splits"] if num: self.task.log( "Foreach yields %d child steps." % num, system_msg=True, pid=self._proc.pid, ) self.task.log( "Task finished successfully.", system_msg=True, pid=self._proc.pid ) self.task.save_logs( { "stdout": self._stdout.get_buffer(), "stderr": self._stderr.get_buffer(), } ) return returncode def __str__(self): return "Worker[%d]: %s" % (self._proc.pid, self.task.path)