metaflow/mflog/__init__.py (100 lines of code) (raw):

import math import time from .mflog import refine, set_should_persist from metaflow.util import to_unicode from metaflow.exception import MetaflowInternalError # Log source indicates the system that *minted the timestamp* # for the logline. This means that for a single task we can # assume that timestamps originating from the same source are # monotonically increasing. Clocks are not synchronized between # log sources, so if a file contains multiple log sources, the # lines may not be in the ascending timestamp order. # Note that a logfile prefixed with a log source, e.g. runtime, # may contain lines from multiple sources below it (e.g. task). # # Note that these file names don't match to any previous log files # (e.g. `0.stdout.log`). Older Metaflow versions will return None # or an empty string when trying to access these new-style files. # This is deliberate, so the users won't see partial files with older # clients. RUNTIME_LOG_SOURCE = "runtime" TASK_LOG_SOURCE = "task" # Loglines from all sources need to be merged together to # produce a complete view of logs. Hence, keep this list short # since each item takes a DataStore access. LOG_SOURCES = [RUNTIME_LOG_SOURCE, TASK_LOG_SOURCE] # BASH_MFLOG defines a bash function that outputs valid mflog # structured loglines. We use this to output properly timestamped # loglined prior to Metaflow package has been downloaded. # Note that MFLOG_STDOUT is defined by mflog_export_env_vars() function. BASH_MFLOG = ( "mflog(){ " "T=$(date -u -Ins|tr , .); " 'echo \\"[MFLOG|0|${T:0:26}Z|%s|$T]$1\\"' " >> $MFLOG_STDOUT; echo $1; " " }" % TASK_LOG_SOURCE ) BASH_SAVE_LOGS_ARGS = ["python", "-m", "metaflow.mflog.save_logs"] BASH_SAVE_LOGS = " ".join(BASH_SAVE_LOGS_ARGS) BASH_FLUSH_LOGS = "flush_mflogs(){ " f"{BASH_SAVE_LOGS}; " "}" # this function returns a bash expression that redirects stdout # and stderr of the given bash expression to mflog.tee def bash_capture_logs(bash_expr, var_transform=None): if var_transform is None: var_transform = lambda s: "$%s" % s cmd = "python -m metaflow.mflog.tee %s %s" parts = ( bash_expr, cmd % (TASK_LOG_SOURCE, var_transform("MFLOG_STDOUT")), cmd % (TASK_LOG_SOURCE, var_transform("MFLOG_STDERR")), ) return "(%s) 1>> >(%s) 2>> >(%s >&2)" % parts # update_delay determines how often logs should be uploaded to S3 # as a function of the task execution time MIN_UPDATE_DELAY = 0.25 # the most frequent update interval MAX_UPDATE_DELAY = 30.0 # the least frequent update interval def update_delay(secs_since_start): # this sigmoid function reaches # - 0.1 after 11 minutes # - 0.5 after 15 minutes # - 1.0 after 23 minutes # in other words, the user will see very frequent updates # during the first 10 minutes sigmoid = 1.0 / (1.0 + math.exp(-0.01 * secs_since_start + 9.0)) return MIN_UPDATE_DELAY + sigmoid * MAX_UPDATE_DELAY # this function is used to generate a Bash 'export' expression that # sets environment variables that are used by 'tee' and 'save_logs'. # Note that we can't set the env vars statically, as some of them # may need to be evaluated during runtime def export_mflog_env_vars( flow_name=None, run_id=None, step_name=None, task_id=None, retry_count=None, datastore_type=None, datastore_root=None, stdout_path=None, stderr_path=None, ): pathspec = "/".join((flow_name, str(run_id), step_name, str(task_id))) env_vars = { "PYTHONUNBUFFERED": "x", "MF_PATHSPEC": pathspec, "MF_DATASTORE": datastore_type, "MF_ATTEMPT": retry_count, "MFLOG_STDOUT": stdout_path, "MFLOG_STDERR": stderr_path, } if datastore_root is not None: env_vars["MF_DATASTORE_ROOT"] = datastore_root return "export " + " ".join("%s=%s" % kv for kv in env_vars.items()) def tail_logs(prefix, stdout_tail, stderr_tail, echo, has_log_updates): def _available_logs(tail, stream, echo, should_persist=False): try: for line in tail: if should_persist: line = set_should_persist(line) else: line = refine(line, prefix=prefix) echo( line.strip().decode("utf-8", errors="replace"), stream, no_bold=True ) except Exception as ex: echo( "%s[ temporary error in fetching logs: %s ]" % (to_unicode(prefix), ex), "stderr", ) start_time = time.time() next_log_update = start_time log_update_delay = update_delay(0) while has_log_updates(): if time.time() > next_log_update: _available_logs(stdout_tail, "stdout", echo) _available_logs(stderr_tail, "stderr", echo) now = time.time() log_update_delay = update_delay(now - start_time) next_log_update = now + log_update_delay # This sleep should never delay log updates. On the other hand, # we should exit this loop when the task has finished without # a long delay, regardless of the log tailing schedule time.sleep(min(log_update_delay, 5.0)) # It is possible that we exit the loop above before all logs have been # tailed. _available_logs(stdout_tail, "stdout", echo) _available_logs(stderr_tail, "stderr", echo) def get_log_tailer(log_url, datastore_type): if datastore_type == "s3": from metaflow.plugins.datatools.s3.s3tail import S3Tail return S3Tail(log_url) elif datastore_type == "azure": from metaflow.plugins.azure.azure_tail import AzureTail return AzureTail(log_url) elif datastore_type == "gs": from metaflow.plugins.gcp.gs_tail import GSTail return GSTail(log_url) else: raise MetaflowInternalError( "Log tailing implementation missing for datastore type %s" % (datastore_type,) )