benchmarking/utils/subprocess_with_logger.py (201 lines of code) (raw):

#!/usr/bin/env python ############################################################################## # Copyright 2017-present, Facebook, Inc. # All rights reserved. # # This source code is licensed under the license found in the # LICENSE file in the root directory of this source tree. ############################################################################## from __future__ import absolute_import from __future__ import division from __future__ import print_function from __future__ import unicode_literals import os import select import signal import subprocess import sys import time from threading import Timer from .custom_logger import getLogger from .utilities import ( setRunStatus, getRunStatus, setRunTimeout, getRunTimeout, getRunKilled, ) def processRun(*args, **kwargs): if "process_key" not in kwargs: kwargs["process_key"] = "" retryCount = 3 if "retry" in kwargs: retryCount = kwargs["retry"] while retryCount > 0: # reset run status overwritting error # from prior run setRunStatus(0, overwrite=True, key=kwargs["process_key"]) sleep = kwargs.get("retry_sleep") if sleep: getLogger().info("Sleeping for {}".format(sleep)) time.sleep(sleep) ret = _processRun(*args, **kwargs) # break out if the run succeeded if getRunStatus(key=kwargs["process_key"]) == 0: if not kwargs.get("silent", False): getLogger().info("Process Succeeded: %s", " ".join(*args)) break # don't retry for errors which we know will # fail again (ie. timeouts) if getRunTimeout(): getLogger().info("Process Failed: %s", " ".join(*args)) break retryCount -= 1 if retryCount > 0: getLogger().info( f"Process Failed (will retry {retryCount} more times): {' '.join(*args)}" ) return ret def _processRun(*args, **kwargs): if not kwargs.get("silent", False): getLogger().info(">>>>>> Running: %s", " ".join(*args)) err_output = None try: run_async = False if "async" in kwargs: run_async = kwargs["async"] non_blocking = False if "non_blocking" in kwargs and kwargs["non_blocking"]: non_blocking = True if non_blocking: _Popen(*args, **kwargs) return [], None timeout = None if "timeout" in kwargs: timeout = kwargs["timeout"] ps = _Popen(*args, **kwargs) t = None if timeout: t = Timer(timeout, _kill, [ps, " ".join(*args), kwargs["process_key"]]) t.start() if run_async: # when running the process asyncronously we return the # popen object and timer for the timeout as a tuple # it is the responsibility of the caller to pass this # tuple into processWait in order to gather the output # from the process return (ps, t), None return processWait((ps, t), **kwargs) except subprocess.CalledProcessError as e: err_output = e.output.decode("utf-8", errors="replace") getLogger().error("Command failed: {}".format(err_output)) except Exception: getLogger().error( "Unknown exception {}: {}".format(sys.exc_info()[0], " ".join(*args)) ) err_output = "{}".format(sys.exc_info()[0]) setRunStatus(1, key=kwargs["process_key"]) return [], err_output def processWait(processAndTimeout, **kwargs): silent = kwargs.get("silent", False) try: ps, t = processAndTimeout process_key = "" if "process_key" in kwargs: process_key = kwargs["process_key"] log_output = False if "log_output" in kwargs: log_output = kwargs["log_output"] ignore_status = False if "ignore_status" in kwargs: ignore_status = kwargs["ignore_status"] patterns = [] if "patterns" in kwargs: patterns = kwargs["patterns"] output, match = _getOutput(ps, patterns, process_key=process_key) ps.stdout.close() if match: # if the process is terminated by mathing output, # assume the process is executed successfully ps.terminate() status = 0 else: # wait for the process to terminate or for a kill request while not getRunKilled(): try: status = ps.wait(timeout=15.0) break except subprocess.TimeoutExpired: pass # check if we exitted loop due to a timeout request if getRunKilled(): getLogger().info("Process was killed at user request") ps.terminate() status = 0 if t is not None: t.cancel() if log_output or (status != 0 and not ignore_status): if status != 0 and not ignore_status: if not silent: getLogger().info("Process exited with status: {}".format(status)) setRunStatus(1, key=process_key) if "filter" in kwargs: output = _filterOutput(output, kwargs["filter"]) if not silent: getLogger().info( "\n\nProgram Output:\n{}\n{}\n{}\n".format( "=" * 80, "\n".join(output), "=" * 80 ) ) if status == 0 or ignore_status: setRunStatus(0, key=process_key) return output, None else: setRunStatus(1, key=process_key) return [], "\n".join(output) except subprocess.CalledProcessError as e: err_output = e.output.decode("utf-8", errors="replace") getLogger().error("Command failed: {}".format(err_output)) except Exception: err_output = "{}".format(sys.exc_info()[0]) getLogger().error("Unknown exception {}".format(sys.exc_info()[0])) return [], err_output def _filterOutput(output, match_list): length = len(output) for i, line in enumerate(output[::-1]): for match in match_list: if match in line: del output[length - i - 1] break return output def _Popen(*args, **kwargs): # only allow allowlisted args to be passed into popen customArgs = {} allowlist = ["env"] for arg in allowlist: if arg in kwargs: customArgs[arg] = kwargs[arg] ps = subprocess.Popen( *args, bufsize=-1, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, preexec_fn=os.setsid, errors="replace", **customArgs, ) # We set the buffer size to system default. # this is not really recommended. However, we need to stream the # output as they are available. So we do this. But, if the data # comes in too fast and there is no time to consume them, the output # may be truncated. Now, add a buffer to reduce the problem. # will see whether this is indeed an issue later on. return ps def _getOutput(ps, patterns, process_key=""): if not isinstance(patterns, list): patterns = [patterns] poller = select.poll() poller.register(ps.stdout) lines = [] match = False while not getRunKilled(process_key): # Try to get output from binary if possible # If not possible then loop # and recheck run killed contidion if poller.poll(15.0): line = ps.stdout.readline() else: continue if not line: break nline = line.rstrip() try: # decode the string if decode exists decoded_line = nline.decode("utf-8", errors="replace") nline = decoded_line except Exception: pass lines.append(nline) for pattern in patterns: if pattern.match(nline): match = True break if match: break return lines, match def _kill(p, cmd, processKey): try: os.killpg(p.pid, signal.SIGKILL) except OSError: pass # ignore getLogger().error("Process timed out: {}".format(cmd)) setRunStatus(1, key=processKey) setRunTimeout()