perfkitbenchmarker/vm_util.py (559 lines of code) (raw):

# Copyright 2014 PerfKitBenchmarker Authors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Set of utility functions for working with virtual machines.""" import contextlib import enum import logging import os import platform import posixpath import random import re import string import subprocess import tempfile import threading import time from typing import Callable, Dict, Iterable, Tuple from absl import flags import jinja2 from perfkitbenchmarker import data from perfkitbenchmarker import errors from perfkitbenchmarker import temp_dir FLAGS = flags.FLAGS # Using logger rather than logging.info to avoid stack_level problems. logger = logging.getLogger() PRIVATE_KEYFILE = 'perfkitbenchmarker_keyfile' PUBLIC_KEYFILE = 'perfkitbenchmarker_keyfile.pub' # The temporary directory on VMs. We cannot reuse GetTempDir() # because run_uri will not be available at time of module load and we need # to use this directory as a base for other module level constants. VM_TMP_DIR = '/tmp/pkb' # Default timeout for issuing a command. DEFAULT_TIMEOUT = 300 # Defaults for retrying commands. POLL_INTERVAL = 30 TIMEOUT = 1200 FUZZ = 0.5 MAX_RETRIES = -1 WINDOWS = 'nt' DARWIN = 'Darwin' PASSWORD_LENGTH = 15 OUTPUT_STDOUT = 0 OUTPUT_STDERR = 1 OUTPUT_EXIT_CODE = 2 flags.DEFINE_integer( 'default_timeout', TIMEOUT, 'The default timeout for retryable commands in seconds.', ) flags.DEFINE_integer( 'burn_cpu_seconds', 0, 'Amount of time in seconds to burn cpu on vm before starting benchmark', ) flags.DEFINE_integer( 'burn_cpu_threads', 1, 'Number of threads to use to burn cpu before starting benchmark.', ) flags.DEFINE_integer( 'background_cpu_threads', None, 'Number of threads of background vm_util.cpu usage while ' 'running a benchmark', ) flags.DEFINE_integer( 'background_network_mbits_per_sec', None, 'Number of megabits per second of background ' 'network traffic to generate during the run phase ' 'of the benchmark', ) flags.DEFINE_boolean( 'ssh_reuse_connections', True, 'Whether to reuse SSH connections rather than ' 'reestablishing a connection for each remote command.', ) # We set this to the short value of 5 seconds so that the cluster boot benchmark # can measure a fast connection when bringing up a VM. This avoids retries that # may not be as quick as every 5 seconds when specifying a larger value. flags.DEFINE_integer( 'ssh_connect_timeout', 5, 'timeout for SSH connection.', lower_bound=0 ) flags.DEFINE_string( 'ssh_control_path', None, 'Overrides the default ControlPath setting for ssh ' 'connections if --ssh_reuse_connections is set. This can ' 'be helpful on systems whose default temporary directory ' 'path is too long (sockets have a max path length) or a ' "version of ssh that doesn't support the %h token. See " 'ssh documentation on the ControlPath setting for more ' 'detailed information.', ) flags.DEFINE_string( 'ssh_control_persist', '30m', 'Setting applied to ssh connections if ' '--ssh_reuse_connections is set. Sets how long the ' 'connections persist before they are removed. ' 'See ssh documentation about the ControlPersist setting ' 'for more detailed information.', ) flags.DEFINE_integer( 'ssh_server_alive_interval', 30, 'Value for ssh -o ServerAliveInterval. Use with ' '--ssh_server_alive_count_max to configure how long to ' 'wait for unresponsive servers.', ) flags.DEFINE_integer( 'ssh_server_alive_count_max', 10, 'Value for ssh -o ServerAliveCountMax. Use with ' '--ssh_server_alive_interval to configure how long to ' 'wait for unresponsive servers.', ) _SSH_PUBLIC_KEY = flags.DEFINE_string( 'ssh_public_key', None, 'File path to the SSH public key. If None, use the newly generated one.', ) _SSH_PRIVATE_KEY = flags.DEFINE_string( 'ssh_private_key', None, 'File path to the SSH private key. If None, use the newly generated one.', ) class RetryError(Exception): """Base class for retry errors.""" class TimeoutExceededRetryError(RetryError): """Exception that is raised when a retryable function times out.""" class RetriesExceededRetryError(RetryError): """Exception that is raised when a retryable function hits its retry limit.""" class ImageNotFoundError(Exception): """Exception that is raised when an image is not found.""" class IpAddressSubset: """Enum of options for --ip_addresses.""" REACHABLE = 'REACHABLE' BOTH = 'BOTH' INTERNAL = 'INTERNAL' EXTERNAL = 'EXTERNAL' ALL = (REACHABLE, BOTH, INTERNAL, EXTERNAL) @enum.unique class VmCommandLogMode(enum.Enum): """The log mode for vm_util.IssueCommand function.""" ALWAYS_LOG = 'always_log' LOG_ON_ERROR = 'log_on_error' _VM_COMMAND_LOG_MODE = flags.DEFINE_enum_class( 'vm_command_log_mode', VmCommandLogMode.ALWAYS_LOG, VmCommandLogMode, ( 'Controls the logging behavior of vm_util.IssueCommand, and' ' specifically its full log statement including output & error message.' ), ) flags.DEFINE_enum( 'ip_addresses', IpAddressSubset.INTERNAL, IpAddressSubset.ALL, 'For networking tests: use both internal and external ' 'IP addresses (BOTH), internal and external only if ' 'the receiving VM is reachable by internal IP (REACHABLE), ' 'external IP only (EXTERNAL) or internal IP only (INTERNAL). The default ' 'is set to INTERNAL.', ) flags.DEFINE_enum( 'background_network_ip_type', IpAddressSubset.EXTERNAL, (IpAddressSubset.INTERNAL, IpAddressSubset.EXTERNAL), 'IP address type to use when generating background network traffic', ) class IpAddressMetadata: INTERNAL = 'internal' EXTERNAL = 'external' def UseProvidedSSHKeys(): if ( _SSH_PUBLIC_KEY.value and _SSH_PRIVATE_KEY.value and os.path.isfile(_SSH_PUBLIC_KEY.value) and os.path.isfile(_SSH_PRIVATE_KEY.value) ): return True return False def GetTempDir(): """Returns the tmp dir of the current run.""" return temp_dir.GetRunDirPath() def PrependTempDir(file_name): """Returns the file name prepended with the tmp dir of the current run.""" return os.path.join(GetTempDir(), file_name) def GenTempDir(): """Creates the tmp dir for the current run if it does not already exist.""" temp_dir.CreateTemporaryDirectories() def SSHKeyGen(): """Use provided SSH keys or create PerfKitBenchmarker SSH keys in the tmp dir of the current run.""" if UseProvidedSSHKeys(): return if not os.path.isdir(GetTempDir()): GenTempDir() if not os.path.isfile(GetPrivateKeyPath()): create_cmd = [ 'ssh-keygen', '-t', 'rsa', '-N', '', '-m', 'PEM', '-q', '-f', PrependTempDir(PRIVATE_KEYFILE), ] IssueCommand(create_cmd) def GetPrivateKeyPath(): if UseProvidedSSHKeys(): return _SSH_PRIVATE_KEY.value return PrependTempDir(PRIVATE_KEYFILE) def GetPublicKeyPath(): if UseProvidedSSHKeys(): return _SSH_PUBLIC_KEY.value return PrependTempDir(PUBLIC_KEYFILE) def GetSshOptions(ssh_key_filename, connect_timeout=None): """Return common set of SSH and SCP options.""" # pyformat: disable options = [ '-2', '-o', 'UserKnownHostsFile=/dev/null', '-o', 'StrictHostKeyChecking=no', '-o', 'IdentitiesOnly=yes', '-o', 'PreferredAuthentications=publickey', '-o', 'PasswordAuthentication=no', '-o', f'ConnectTimeout={connect_timeout or FLAGS.ssh_connect_timeout}', '-o', 'GSSAPIAuthentication=no', '-o', f'ServerAliveInterval={FLAGS.ssh_server_alive_interval}', '-o', f'ServerAliveCountMax={FLAGS.ssh_server_alive_count_max}', '-i', ssh_key_filename, ] # pyformat: enable if FLAGS.use_ipv6: options.append('-6') if FLAGS.ssh_reuse_connections: control_path = FLAGS.ssh_control_path or os.path.join( temp_dir.GetSshConnectionsDir(), '%h' ) options.extend([ '-o', 'ControlPath="%s"' % control_path, '-o', 'ControlMaster=auto', '-o', 'ControlPersist=%s' % FLAGS.ssh_control_persist, ]) options.extend(FLAGS.ssh_options) return options def Retry( poll_interval=POLL_INTERVAL, max_retries=MAX_RETRIES, timeout=None, fuzz=FUZZ, log_errors=True, retryable_exceptions=None, ): """A function decorator that will retry when exceptions are thrown. Args: poll_interval: The time between tries in seconds. This is the maximum poll interval when fuzz is specified. max_retries: The maximum number of retries before giving up. If -1, this means continue until the timeout is reached. The function will stop retrying when either max_retries is met or timeout is reached. timeout: The timeout for all tries in seconds. If -1, this means continue until max_retries is met. The function will stop retrying when either max_retries is met or timeout is reached. fuzz: The amount of randomness in the sleep time. This is used to keep threads from all retrying at the same time. At 0, this means sleep exactly poll_interval seconds. At 1, this means sleep anywhere from 0 to poll_interval seconds. log_errors: A boolean describing whether errors should be logged. retryable_exceptions: A tuple of exceptions that should be retried. By default, this is None, which indicates that all exceptions should be retried. Returns: A function that wraps functions in retry logic. It can be used as a decorator. Raises: TimeoutExceededRetryError - if the provided (or default) timeout is exceeded while retrying the wrapped function. RetriesExceededRetryError - if the provided (or default) limit on the number of retry attempts is exceeded while retrying the wrapped function. """ if retryable_exceptions is None: # TODO(user) Make retries less aggressive. retryable_exceptions = Exception def Wrap(f): """Wraps the supplied function with retry logic.""" def WrappedFunction(*args, **kwargs): """Holds the retry logic.""" local_timeout = FLAGS.default_timeout if timeout is None else timeout if local_timeout >= 0: deadline = time.time() + local_timeout else: deadline = float('inf') tries = 0 while True: try: tries += 1 return f(*args, **kwargs) except retryable_exceptions as e: fuzz_multiplier = 1 - fuzz + random.random() * fuzz sleep_time = poll_interval * fuzz_multiplier if (time.time() + sleep_time) >= deadline: raise TimeoutExceededRetryError() from e elif max_retries >= 0 and tries > max_retries: raise RetriesExceededRetryError() from e else: if log_errors: logging.info( 'Retrying exception running %s: %s', f.__qualname__, repr(e) ) time.sleep(sleep_time) return WrappedFunction return Wrap class _BoxedObject: """Box a value in a reference so it is modifiable inside an inner function. In python3 the nonlocal keyword could be used instead - but for python2 there is no support for modifying an external scoped variable value. """ def __init__(self, initial_value): self.value = initial_value def _ReadIssueCommandOutput(tf_out, tf_err): """Reads IssueCommand Output from stdout and stderr.""" tf_out.seek(0) stdout = tf_out.read().decode('ascii', 'ignore') tf_err.seek(0) stderr = tf_err.read().decode('ascii', 'ignore') return stdout, stderr def IssueCommand( cmd: Iterable[str], env: Dict[str, str] | None = None, timeout: int | None = DEFAULT_TIMEOUT, cwd: str | None = None, should_pre_log: bool = True, raise_on_failure: bool = True, suppress_failure: Callable[[str, str, int], bool] | None = None, suppress_logging: bool = False, raise_on_timeout: bool = True, stack_level: int = 1, ) -> Tuple[str, str, int]: """Tries running the provided command once. Args: cmd: A list of strings such as is given to the subprocess.Popen() constructor. env: A dict of key/value strings, such as is given to the subprocess.Popen() constructor, that contains environment variables to be injected. timeout: Timeout for the command in seconds. If the command has not finished before the timeout is reached, it will be killed. Set timeout to None to let the command run indefinitely. If the subprocess is killed, the return code will indicate an error, and stdout and stderr will contain what had already been written to them before the process was killed. cwd: Directory in which to execute the command. should_pre_log: A boolean indicating if command should be outputted alone prior to the output with command, stdout, & stderr. Useful for e.g. timing command length & standing out in logs. raise_on_failure: A boolean indicating if non-zero return codes should raise IssueCommandError. suppress_failure: A function passed (stdout, stderr, ret_code) for non-zero return codes to determine if the failure should be suppressed e.g. a delete command which fails because the item to be deleted does not exist. suppress_logging: A boolean indicated whether STDOUT and STDERR should be suppressed. Used for sensitive information. raise_on_timeout: A boolean indicating if killing the process due to the timeout being hit should raise a IssueCommandTimeoutError stack_level: Number of stack frames to skip & get an "interesting" caller, for logging. 1 skips this function, 2 skips this & its caller, etc.. Returns: A tuple of stdout, stderr, and retcode from running the provided command. Raises: IssueCommandError: When raise_on_failure=True and retcode is non-zero. IssueCommandTimeoutError: When raise_on_timeout=True and command duration exceeds timeout ValueError: When incorrect parameters are passed in. """ stack_level += 1 if env: logger.debug('Environment variables: %s', env, stacklevel=stack_level) # Force conversion to string so you get a nice log statement before hitting a # type error or NPE. if isinstance(cmd, str): raise ValueError( f'Command must be a list of strings, but string {cmd} was received' ) full_cmd = ' '.join(str(w) for w in cmd) if '; ' in full_cmd: logger.warning( ( 'Semicolon ; detected in command. Prefer && for better error ' 'handling. Feel free to ignore if not using semicolon to split ' 'commands. Full command: %s' ), full_cmd, ) time_file_path = '/usr/bin/time' running_on_windows = RunningOnWindows() running_on_darwin = RunningOnDarwin() should_time = ( not (running_on_windows or running_on_darwin) and os.path.isfile(time_file_path) and FLAGS.time_commands ) shell_value = running_on_windows with ( tempfile.TemporaryFile() as tf_out, tempfile.TemporaryFile() as tf_err, tempfile.NamedTemporaryFile(mode='r') as tf_timing, ): cmd_to_use = cmd if should_time: cmd_to_use = [ time_file_path, '-o', tf_timing.name, '--quiet', '-f', ', WallTime:%Es, CPU:%Us, MaxMemory:%Mkb ', ] + list(cmd) if should_pre_log: logger.info('Running: %s', full_cmd, stacklevel=stack_level) try: process = subprocess.Popen( cmd_to_use, env=env, shell=shell_value, stdin=subprocess.PIPE, stdout=tf_out, stderr=tf_err, cwd=cwd, ) except TypeError as e: # Only perform this validation after a type error, in case we are being # too strict. non_strings = [s for s in cmd if not isinstance(s, str)] if non_strings: raise ValueError( f'Command {cmd} contains non-string elements {non_strings}.' ) from e raise did_timeout = _BoxedObject(False) was_killed = _BoxedObject(False) def _KillProcess(): did_timeout.value = True if not raise_on_timeout: logger.warning( 'IssueCommand timed out after %d seconds. Killing command "%s".', timeout, full_cmd, stacklevel=stack_level, ) process.kill() was_killed.value = True timer = threading.Timer(timeout, _KillProcess) timer.start() try: process.wait() finally: timer.cancel() stdout, stderr = _ReadIssueCommandOutput(tf_out, tf_err) timing_output = '' if should_time: timing_output = tf_timing.read().rstrip('\n') logged_stdout = '[REDACTED]' if suppress_logging else stdout logged_stderr = '[REDACTED]' if suppress_logging else stderr debug_text = 'Ran: {%s}\nReturnCode:%s%s\nSTDOUT: %s\nSTDERR: %s' % ( full_cmd, process.returncode, timing_output, logged_stdout, logged_stderr, ) if _VM_COMMAND_LOG_MODE.value == VmCommandLogMode.ALWAYS_LOG or ( _VM_COMMAND_LOG_MODE.value == VmCommandLogMode.LOG_ON_ERROR and process.returncode ): logger.info(debug_text, stacklevel=stack_level) # Raise timeout error regardless of raise_on_failure - as the intended # semantics is to ignore expected errors caused by invoking the command # not errors from PKB infrastructure. if did_timeout.value and raise_on_timeout: debug_text = ( '{}\nIssueCommand timed out after {} seconds. ' '{} by perfkitbenchmarker.'.format( debug_text, timeout, 'Process was killed' if was_killed.value else 'Process may have been killed', ) ) raise errors.VmUtil.IssueCommandTimeoutError(debug_text) elif process.returncode and (raise_on_failure or suppress_failure): if suppress_failure and suppress_failure( stdout, stderr, process.returncode ): # failure is suppressible, rewrite the stderr and return code as passing # since some callers assume either is a failure e.g. # perfkitbenchmarker.providers.aws.util.IssueRetryableCommand() return stdout, '', 0 raise errors.VmUtil.IssueCommandError(debug_text) return stdout, stderr, process.returncode def IssueBackgroundCommand(cmd, stdout_path, stderr_path, env=None): """Run the provided command once in the background. Args: cmd: Command to be run, as expected by subprocess.Popen. stdout_path: Redirect stdout here. Overwritten. stderr_path: Redirect stderr here. Overwritten. env: A dict of key/value strings, such as is given to the subprocess.Popen() constructor, that contains environment variables to be injected. """ logging.debug('Environment variables: %s', env) full_cmd = ' '.join(cmd) logging.info('Spawning: %s', full_cmd) outfile = open(stdout_path, 'w') errfile = open(stderr_path, 'w') shell_value = RunningOnWindows() subprocess.Popen( cmd, env=env, shell=shell_value, stdout=outfile, stderr=errfile, close_fds=True, ) @Retry() def IssueRetryableCommand(cmd, env=None, **kwargs): """Tries running the provided command until it succeeds or times out. Args: cmd: A list of strings such as is given to the subprocess.Popen() constructor. env: An alternate environment to pass to the Popen command. **kwargs: additional arguments for the command Returns: A tuple of stdout and stderr from running the provided command. """ # Additional retries will break stack_level, but works for the first one. kwargs['stack_level'] = kwargs.get('stack_level', 1) + 2 stdout, stderr, retcode = IssueCommand( cmd, env=env, raise_on_failure=False, **kwargs ) if retcode: debug_text = 'Ran: {%s}\nReturnCode:%s\nSTDOUT: %s\nSTDERR: %s' % ( ' '.join(cmd), retcode, stdout, stderr, ) raise errors.VmUtil.CalledProcessException( 'Command returned a non-zero exit code:\n{}'.format(debug_text) ) return stdout, stderr def ParseTimeCommandResult(command_result): """Parse command result and get time elapsed. Note this parses the output of bash's time builtin, not /usr/bin/time or other implementations. You may need to run something like bash -c "time ./command" to produce parseable output. Args: command_result: The result after executing a remote time command. Returns: Time taken for the command. """ time_data = re.findall(r'real\s+(\d+)m(\d+.\d+)', command_result) time_in_seconds = 60 * float(time_data[0][0]) + float(time_data[0][1]) return time_in_seconds def ShouldRunOnExternalIpAddress(ip_type=None): """Returns whether a test should be run on an instance's external IP.""" ip_type_to_check = ip_type or FLAGS.ip_addresses return ip_type_to_check in ( IpAddressSubset.EXTERNAL, IpAddressSubset.BOTH, IpAddressSubset.REACHABLE, ) def ShouldRunOnInternalIpAddress(sending_vm, receiving_vm, ip_type=None): """Returns whether a test should be run on an instance's internal IP. Based on the command line flag --ip_addresses. Internal IP addresses are used when: * --ip_addresses=BOTH or --ip-addresses=INTERNAL * --ip_addresses=REACHABLE and 'sending_vm' can ping 'receiving_vm' on its internal IP. Args: sending_vm: VirtualMachine. The client. receiving_vm: VirtualMachine. The server. ip_type: optional ip_type to use instead of what is set in the FLAGS Returns: Whether a test should be run on an instance's internal IP. """ ip_type_to_check = ip_type or FLAGS.ip_addresses return ip_type_to_check in ( IpAddressSubset.BOTH, IpAddressSubset.INTERNAL, ) or ( ip_type_to_check == IpAddressSubset.REACHABLE and sending_vm.IsReachable(receiving_vm) ) def GetLastRunUri(): """Returns the last run_uri used (or None if it can't be determined).""" runs_dir_path = temp_dir.GetAllRunsDirPath() try: dir_names = next(os.walk(runs_dir_path))[1] except StopIteration: # The runs directory was not found. return None if not dir_names: # No run subdirectories were found in the runs directory. return None # Return the subdirectory with the most recent modification time. return max( dir_names, key=lambda d: os.path.getmtime(os.path.join(runs_dir_path, d)) ) @contextlib.contextmanager def NamedTemporaryFile( mode='w+b', prefix='tmp', suffix='', dir=None, delete=True ): """Behaves like tempfile.NamedTemporaryFile. The existing tempfile.NamedTemporaryFile has the annoying property on Windows that it cannot be opened a second time while it is already open. This makes it impossible to use it with a "with" statement in a cross platform compatible way. This serves a similar role, but allows the file to be closed within a "with" statement without causing the file to be unlinked until the context exits. Args: mode: see mode in tempfile.NamedTemporaryFile. prefix: see prefix in tempfile.NamedTemporaryFile. suffix: see suffix in tempfile.NamedTemporaryFile. dir: see dir in tempfile.NamedTemporaryFile. delete: see delete in NamedTemporaryFile. Yields: A cross platform file-like object which is "with" compatible. """ f = tempfile.NamedTemporaryFile( mode=mode, prefix=prefix, suffix=suffix, dir=dir, delete=False ) try: yield f finally: if not f.closed: f.close() if delete: os.unlink(f.name) def GenerateSSHConfig(vms, vm_groups): """Generates an SSH config file to simplify connecting to the specified VMs. Writes a file to GetTempDir()/ssh_config with an SSH configuration for each VM provided in the arguments. Users can then SSH with any of the following: ssh -F <ssh_config_path> <vm_name> ssh -F <ssh_config_path> vm<vm_index> ssh -F <ssh_config_path> <group_name>-<index> Args: vms: list of BaseVirtualMachines. vm_groups: dict mapping VM group name string to list of BaseVirtualMachines. """ target_file = os.path.join(GetTempDir(), 'ssh_config') template_path = data.ResourcePath('ssh_config.j2') environment = jinja2.Environment(undefined=jinja2.StrictUndefined) with open(template_path) as fp: template = environment.from_string(fp.read()) with open(target_file, 'w') as ofp: ofp.write(template.render({'vms': vms, 'vm_groups': vm_groups})) ssh_options = [ ' ssh -F {} {}'.format(target_file, pattern) for pattern in ('<vm_name>', 'vm<index>', '<group_name>-<index>') ] logging.info( 'ssh to VMs in this benchmark by name with:\n%s', '\n'.join(ssh_options) ) def RunningOnWindows(): """Returns True if PKB is running on Windows.""" return os.name == WINDOWS def RunningOnDarwin(): """Returns True if PKB is running on a Darwin OS machine.""" return os.name != WINDOWS and platform.system() == DARWIN def ExecutableOnPath(executable_name): """Return True if the given executable can be found on the path.""" cmd = ['where'] if RunningOnWindows() else ['which'] cmd.append(executable_name) shell_value = RunningOnWindows() process = subprocess.Popen( cmd, shell=shell_value, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) process.communicate() if process.returncode: return False return True def GenerateRandomWindowsPassword( password_length=PASSWORD_LENGTH, special_chars='*!@#$+' ): """Generates a password that meets Windows complexity requirements.""" # The special characters have to be recognized by the Azure CLI as # special characters. This greatly limits the set of characters # that we can safely use. See # https://github.com/Azure/azure-xplat-cli/blob/master/lib/commands/arm/vm/vmOsProfile._js#L145 # Ensure that the password contains at least one of each 4 required # character types starting with letters to avoid starting with chars which # are problematic on the command line e.g. @. prefix = [ random.choice(string.ascii_lowercase), random.choice(string.ascii_uppercase), random.choice(string.digits), random.choice(special_chars), ] password = [ random.choice(string.ascii_letters + string.digits + special_chars) for _ in range(password_length - 4) ] return ''.join(prefix + password) def CopyFileBetweenVms(filename, src_vm, src_path, dest_vm, dest_path): """Copies a file from the src_vm to the dest_vm.""" with tempfile.NamedTemporaryFile() as tf: temp_path = tf.name src_vm.RemoteCopy( temp_path, os.path.join(src_path, filename), copy_to=False ) dest_vm.RemoteCopy( temp_path, os.path.join(dest_path, filename), copy_to=True ) def ReplaceText(vm, current_value, new_value, file_name, regex_char='/'): """Replaces text <current_value> with <new_value> in remote <file_name>.""" vm.RemoteCommand( 'sed -i -r "s{regex_char}{current_value}{regex_char}' '{new_value}{regex_char}" {file}'.format( regex_char=regex_char, current_value=current_value, new_value=new_value, file=file_name, ) ) def DictionaryToEnvString(dictionary, joiner=' '): """Convert a dictionary to a space sperated 'key=value' string. Args: dictionary: the key-value dictionary to be convert joiner: string to separate the entries in the returned value. Returns: a string representing the dictionary """ return joiner.join( f'{key}={value}' for key, value in sorted(dictionary.items()) ) def CreateRemoteFile(vm, file_contents, file_path): """Creates a file on the remote server.""" with NamedTemporaryFile(mode='w') as tf: tf.write(file_contents) tf.close() parent_dir = posixpath.dirname(file_path) vm.RemoteCommand(f'[ -d {parent_dir} ] || mkdir -p {parent_dir}') vm.PushFile(tf.name, file_path) def ReadLocalFile(filename: str) -> str: """Read the local file.""" file_path = posixpath.join(GetTempDir(), filename) stdout, _, _ = IssueCommand(['cat', file_path]) return stdout