esrally/utils/process.py (153 lines of code) (raw):

# Licensed to Elasticsearch B.V. under one or more contributor # license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright # ownership. Elasticsearch B.V. licenses this file to you under # the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import logging import os import shlex import subprocess import time from collections.abc import Iterable, Mapping from typing import IO, Callable, Optional, Union import psutil LogLevel = int FileId = int def run_subprocess(command_line: str) -> int: """ Runs the provided command line in a subprocess. :param command_line: The command line of the subprocess to launch. :return: The process' return code """ return subprocess.call(command_line, shell=True) def run_subprocess_with_output(command_line: str, env: Optional[Mapping[str, str]] = None) -> list[str]: logger = logging.getLogger(__name__) logger.debug("Running subprocess [%s] with output.", command_line) command_line_args = shlex.split(command_line) with subprocess.Popen(command_line_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env) as command_line_process: has_output = True lines = [] while has_output: assert command_line_process.stdout is not None, "stdout is None" line = command_line_process.stdout.readline() if line: lines.append(line.decode("UTF-8").strip()) else: has_output = False return lines def exit_status_as_bool(runnable: Callable[[], int], quiet: bool = False) -> bool: """ :param runnable: A runnable returning an int as exit status assuming ``0`` is meaning success. :param quiet: Suppress any output (default: False). :return: True iff the runnable has terminated successfully. """ try: return_code = runnable() return return_code == 0 or return_code is None except OSError: if not quiet: logging.getLogger(__name__).exception("Could not execute command.") return False def run_subprocess_with_logging( command_line: str, header: Optional[str] = None, level: LogLevel = logging.INFO, stdin: Optional[Union[FileId, IO[bytes]]] = None, env: Optional[Mapping[str, str]] = None, detach: bool = False, ) -> int: """ Runs the provided command line in a subprocess. All output will be captured by a logger. :param command_line: The command line of the subprocess to launch. :param header: An optional header line that should be logged (this will be logged on info level, regardless of the defined log level). :param level: The log level to use for output (default: logging.INFO). :param stdin: The stdout object returned by subprocess.Popen(stdout=PIPE) allowing chaining of shell operations with pipes (default: None). :param env: Use specific environment variables (default: None). :param detach: Whether to detach this process from its parent process (default: False). :return: The process exit code as an int. """ logger = logging.getLogger(__name__) logger.debug("Running subprocess [%s] with logging.", command_line) command_line_args = shlex.split(command_line) pre_exec = os.setpgrp if detach else None if header is not None: logger.info(header) # pylint: disable=subprocess-popen-preexec-fn with subprocess.Popen( command_line_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, env=env, stdin=stdin if stdin else None, preexec_fn=pre_exec, ) as command_line_process: stdout, _ = command_line_process.communicate() if stdout: logger.log(level=level, msg=stdout) logger.debug("Subprocess [%s] finished with return code [%s].", command_line, str(command_line_process.returncode)) return command_line_process.returncode def run_subprocess_with_logging_and_output( command_line: str, header: Optional[str] = None, level: LogLevel = logging.INFO, stdin: Optional[Union[FileId, IO[bytes]]] = None, env: Optional[Mapping[str, str]] = None, detach: bool = False, ) -> subprocess.CompletedProcess: """ Runs the provided command line in a subprocess. All output will be captured by a logger. :param command_line: The command line of the subprocess to launch. :param header: An optional header line that should be logged (this will be logged on info level, regardless of the defined log level). :param level: The log level to use for output (default: logging.INFO). :param stdin: The stdout object returned by subprocess.Popen(stdout=PIPE) allowing chaining of shell operations with pipes (default: None). :param env: Use specific environment variables (default: None). :param detach: Whether to detach this process from its parent process (default: False). :return: The process exit code as an int. """ logger = logging.getLogger(__name__) logger.debug("Running subprocess [%s] with logging.", command_line) command_line_args = shlex.split(command_line) pre_exec = os.setpgrp if detach else None if header is not None: logger.info(header) completed = subprocess.run( command_line_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=env, check=False, stdin=stdin if stdin else None, preexec_fn=pre_exec, ) for stdout in completed.stdout.splitlines(): logger.log(level=level, msg=stdout) logger.debug("Subprocess [%s] finished with return code [%s].", command_line, str(completed.returncode)) return completed def is_rally_process(p: psutil.Process) -> bool: return ( p.name() == "esrally" or p.name() == "rally" or ( p.name().lower().startswith("python") and any("esrally" in e for e in p.cmdline()) and not any("esrallyd" in e for e in p.cmdline()) ) ) def find_all_other_rally_processes() -> list[psutil.Process]: others: list[psutil.Process] = [] for_all_other_processes(is_rally_process, others.append) return others def redact_cmdline(cmdline: list) -> list[str]: """ Redact client options in cmdline as it contains sensitive information like passwords """ return ["=".join((value.split("=")[0], '"*****"')) if "--client-options" in value else value for value in cmdline] def kill_all(predicate: Callable[[psutil.Process], bool]) -> None: def kill(p: psutil.Process) -> None: logging.getLogger(__name__).info( "Killing lingering process with PID [%s] and command line [%s].", p.pid, redact_cmdline(p.cmdline()) ) p.kill() # wait until process has terminated, at most 3 seconds. Otherwise we might run into race conditions with actor system # sockets that are still open. for _ in range(3): try: p.status() time.sleep(1) except psutil.NoSuchProcess: break for_all_other_processes(predicate, kill) def for_all_other_processes(predicate: Callable[[psutil.Process], bool], action: Callable[[psutil.Process], None]) -> None: # no harakiri please my_pid = os.getpid() for p in psutil.process_iter(): try: if p.pid != my_pid and predicate(p): action(p) except (psutil.ZombieProcess, psutil.AccessDenied, psutil.NoSuchProcess): pass def kill_running_rally_instances() -> None: def rally_process(p: psutil.Process) -> bool: return ( p.name() == "esrally" or p.name() == "rally" or ( p.name().lower().startswith("python") and any("esrally" in e for e in p.cmdline()) and not any("esrallyd" in e for e in p.cmdline()) ) ) kill_all(rally_process) def wait_for_child_processes( timeout: Optional[float] = None, callback: Optional[Callable[[psutil.Process], None]] = None, list_callback: Optional[Callable[[Iterable[psutil.Process]], None]] = None, ) -> bool: """ Waits for all child processes to terminate. :param timeout: The maximum time to wait for child processes to terminate (default: None). :param callback: A callback to call as each child process terminates. The callback will be passed the PID and the return code of the child process. :param list_callback: A callback to tell caller about the child processes that are being waited for. :return: False if no child processes found, True otherwise. """ current = psutil.Process() children = current.children(recursive=True) if not children: return False if list_callback is not None: list_callback(children) psutil.wait_procs(children, timeout=timeout, callback=callback) return True