perfkitbenchmarker/linux_virtual_machine.py (2,199 lines of code) (raw):
# Copyright 2019 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module containing mixin classes for linux virtual machines.
These classes allow installation on both Debian and RHEL based linuxes.
They also handle some initial setup (especially on RHEL based linuxes
since by default sudo commands without a tty don't work) and
can restore the VM to the state it was in before packages were
installed.
To install a package on a VM, just call vm.Install(package_name).
The package name is just the name of the package module (i.e. the
file name minus .py). The framework will take care of all cleanup
for you.
"""
import abc
import collections
import copy
import json
import logging
import os
import pipes
import posixpath
import re
import threading
import time
from typing import Any, Dict, Set, Tuple, Union
import uuid
from absl import flags
from packaging import version as packaging_version
from perfkitbenchmarker import background_tasks
from perfkitbenchmarker import disk
from perfkitbenchmarker import errors
from perfkitbenchmarker import linux_packages
from perfkitbenchmarker import os_mixin
from perfkitbenchmarker import os_types
from perfkitbenchmarker import regex_util
from perfkitbenchmarker import sample
from perfkitbenchmarker import virtual_machine
from perfkitbenchmarker import vm_util
import yaml
FLAGS = flags.FLAGS
OS_PRETTY_NAME_REGEXP = r'PRETTY_NAME="(.*)"'
_EPEL_URL = (
'https://dl.fedoraproject.org/pub/epel/epel-release-latest-{}.noarch.rpm'
)
CLEAR_BUILD_REGEXP = r'Installed version:\s*(.*)\s*'
UPDATE_RETRIES = 5
DEFAULT_SSH_PORT = 22
REMOTE_KEY_PATH = '~/.ssh/id_rsa'
CONTAINER_MOUNT_DIR = '/mnt'
CONTAINER_WORK_DIR = '/root'
# This pair of scripts used for executing long-running commands, which will be
# resilient in the face of SSH connection errors.
# EXECUTE_COMMAND runs a command, streaming stdout / stderr to a file, then
# writing the return code to a file. An exclusive lock is acquired on the return
# code file, so that other processes may wait for completion.
EXECUTE_COMMAND = 'execute_command.py'
# WAIT_FOR_COMMAND waits on the file lock created by EXECUTE_COMMAND,
# then copies the stdout and stderr, exiting with the status of the command run
# by EXECUTE_COMMAND.
WAIT_FOR_COMMAND = 'wait_for_command.py'
_DEFAULT_DISK_FS_TYPE = 'ext4'
_DEFAULT_DISK_MOUNT_OPTIONS = 'discard'
_DEFAULT_DISK_FSTAB_OPTIONS = 'defaults'
# regex for parsing lscpu and /proc/cpuinfo
_COLON_SEPARATED_RE = re.compile(r'^\s*(?P<key>.*?)\s*:\s*(?P<value>.*?)\s*$')
_BRACKET_SEPARATED_RE = re.compile(r'^\s*(?P<key>.*\))\s*(?P<value>.*?)\s*$')
_SYSFS_CPU_PATH = '/sys/devices/system/cpu'
# TODO(user): update these to use a flag holder as recommended
# in go/python-tips/051
flags.DEFINE_bool(
'setup_remote_firewall',
False,
'Whether PKB should configure the firewall of each remote'
'VM to make sure it accepts all internal connections.',
)
flags.DEFINE_list(
'sysctl',
[],
'Sysctl values to set. This flag should be a comma-separated '
'list of path=value pairs. Each pair will be appended to'
'/etc/sysctl.conf. '
'For example, if you pass '
'--sysctls=vm.dirty_background_ratio=10,vm.dirty_ratio=25, '
'PKB will append "vm.dirty_background_ratio=10" and'
'"vm.dirty_ratio=25" on separate lines to /etc/sysctrl.conf',
)
flags.DEFINE_bool(
'reboot_after_changing_sysctl',
False,
'Whether PKB should reboot after applying sysctl changes',
)
flags.DEFINE_list(
'set_files',
[],
'Arbitrary filesystem configuration. This flag should be a '
'comma-separated list of path=value pairs. Each value will '
'be written to the corresponding path. For example, if you '
'pass --set_files=/sys/kernel/mm/transparent_hugepage/enabled=always, '
'then PKB will write "always" to '
'/sys/kernel/mm/transparent_hugepage/enabled before starting '
'the benchmark.',
)
flags.DEFINE_bool(
'network_enable_BBR',
False,
'A shortcut to enable BBR congestion control on the network. '
'equivalent to appending to --sysctls the following values '
'"net.core.default_qdisc=fq, '
'"net.ipv4.tcp_congestion_control=bbr" ',
)
flags.DEFINE_integer(
'num_disable_cpus',
None,
'Number of CPUs to disable on the virtual machine.'
'If the VM has n CPUs, you can disable at most n-1.',
lower_bound=1,
)
flags.DEFINE_integer('disk_fill_size', 0, 'Size of file to create in GBs.')
flags.DEFINE_enum(
'disk_fs_type',
_DEFAULT_DISK_FS_TYPE,
[_DEFAULT_DISK_FS_TYPE, 'xfs'],
'File system type used to format disk.',
)
flags.DEFINE_integer(
'disk_block_size',
None,
'Block size to format disk with.Defaults to 4096 for ext4.',
)
flags.DEFINE_bool(
'enable_transparent_hugepages',
None,
'Whether to enable or '
'disable transparent hugepages. If unspecified, the setting '
'is unchanged from the default in the OS.',
)
flags.DEFINE_integer(
'ssh_retries',
10,
'Default number of times to retry transient failures on SSH/SCP commands.',
lower_bound=1,
)
flags.DEFINE_integer(
'scp_connect_timeout', 30, 'timeout for SCP connection.', lower_bound=0
)
flags.DEFINE_string(
'append_kernel_command_line',
None,
'String to append to the kernel command line. The presence of any '
'non-empty string will cause a reboot to occur after VM prepare. '
'If unspecified, the kernel command line will be unmodified.',
)
flags.DEFINE_integer(
'tcp_max_receive_buffer',
None,
'Changes the third component of the sysctl value net.ipv4.tcp_rmem. '
'This sets the maximum receive buffer for TCP socket connections in bytes. '
'Increasing this value may increase single stream TCP throughput '
'for high latency connections',
)
flags.DEFINE_integer(
'tcp_max_send_buffer',
None,
'Changes the third component of the sysctl value net.ipv4.tcp_wmem. '
'This sets the maximum send buffer for TCP socket connections in bytes. '
'Increasing this value may increase single stream TCP throughput '
'for high latency connections',
)
_TCP_MAX_NOTSENT_BYTES = flags.DEFINE_integer(
'tcp_max_notsent_bytes',
None,
'Changes the third component of the sysctl value '
'net.ipv4.tcp_notsent_lowat. This sets the maximum number of unsent bytes '
'for TCP socket connections. Decreasing this value may to reduce usage '
'of kernel memory.',
)
flags.DEFINE_integer(
'rmem_max',
None,
'Sets the sysctl value net.core.rmem_max. This sets the max OS '
'receive buffer size in bytes for all types of connections',
)
flags.DEFINE_integer(
'wmem_max',
None,
'Sets the sysctl value net.core.wmem_max. This sets the max OS '
'send buffer size in bytes for all types of connections',
)
flags.DEFINE_boolean(
'gce_hpc_tools', False, 'Whether to apply the hpc-tools environment script.'
)
flags.DEFINE_boolean(
'disable_smt',
False,
'Whether to disable SMT (Simultaneous Multithreading) in BIOS.',
)
flags.DEFINE_boolean(
'use_numcpu_multi_files',
False,
'Whether to use /sys/fs/cgroup/cpuset.cpus.effective, '
'/dev/cgroup/cpuset.cpus.effective, /proc/self/status, '
'/proc/cpuinfo to extract the number of CPUs.',
)
flags.DEFINE_boolean(
'use_cgroup_memory_limits',
False,
'Whether to use the cgroup memory limits, read from '
'/sys/fs/cgroup/memory/{container_name}/memory.limit_in_bytes, '
'to extract the total available memory capacity in the container.',
)
flags.DEFINE_integer(
'visible_core_count', None, 'To customize the number of visible CPU cores.'
)
_DISABLE_YUM_CRON = flags.DEFINE_boolean(
'disable_yum_cron', True, 'Whether to disable the cron-run yum service.'
)
_KERNEL_MODULES_TO_ADD = flags.DEFINE_list(
'kernel_modules_to_add', [], 'Kernel modules to add to Linux VMs'
)
_KERNEL_MODULES_TO_REMOVE = flags.DEFINE_list(
'kernel_modules_to_remove', [], 'Kernel modules to remove from Linux VMs'
)
_DISABLE_CSTATE_BY_NAME_AND_DEEPER = flags.DEFINE_string(
'disable_cstate_by_name_and_deeper',
None,
'When specified, cstates that either match the given string or lower are'
' disabled. For instance, if C1E is specified for a VM running the'
' intel_idle driver, then C1E and C6 states would all be disabled, but C1'
' will remain enabled.',
)
_ENABLE_NVME_INTERRUPT_COALEASING = flags.DEFINE_bool(
'enable_nvme_interrupt_coaleasing',
False,
'Attempt to enable interrupt coaleasing for all the NVMe disks on this VM. '
'Currently only implemented for local disks. '
'Depending on the Guest, this command may or may not actually '
'modify the interrupt coaleasing behavior.',
)
# RHEL package managers
YUM = 'yum'
DNF = 'dnf'
RETRYABLE_SSH_RETCODE = 255
# Using root logger removes one function call logging.info otherwise adds to
# the stack level. Can remove after python 11; see:
# https://bugs.python.org/issue45171
logger = logging.getLogger()
def ParseRangeList(csv_list: str) -> set[int]:
"""Parses a comma separated list of numbers and/or ranges into a set of ints.
Args:
csv_list: The CSV list to parse.
Returns:
A set of integers.
"""
if not csv_list:
return set()
answer: set[int] = set()
if ',' in csv_list:
items_to_convert = csv_list.split(',')
else:
items_to_convert = [csv_list]
for item_value in items_to_convert:
if '-' not in item_value:
answer.add(int(item_value))
continue
lhs, rhs = item_value.split('-')
try:
lhs = int(lhs)
rhs = int(rhs)
except ValueError as exc:
raise ValueError(f'Invalid range: [{item_value}]') from exc
if lhs > rhs:
raise ValueError(f'Invalid range found while parsing: [{lhs}-{rhs}]')
answer.update(range(lhs, rhs + 1))
return answer
class CpuVulnerabilities:
"""The 3 different vulnerability statuses from vm.cpu_vulernabilities.
Example input:
/sys/devices/system/cpu/vulnerabilities/itlb_multihit:KVM: Vulnerable
Is put into vulnerability with a key of "itlb_multihit" and value "KVM"
Unparsed lines are put into the unknown dict.
"""
def __init__(self):
self.mitigations: Dict[str, str] = {}
self.vulnerabilities: Dict[str, str] = {}
self.notaffecteds: Set[str] = set()
self.unknowns: Dict[str, str] = {}
def AddLine(self, full_line: str) -> None:
"""Parses a line of output from the cpu/vulnerabilities/* files."""
if not full_line:
return
file_path, line = full_line.split(':', 1)
file_name = posixpath.basename(file_path)
if self._AddMitigation(file_name, line):
return
if self._AddVulnerability(file_name, line):
return
if self._AddNotAffected(file_name, line):
return
self.unknowns[file_name] = line
def _AddMitigation(self, file_name, line):
match = re.match('^Mitigation: (.*)', line) or re.match(
'^([^:]+): Mitigation: (.*)$', line
)
if match:
self.mitigations[file_name] = ':'.join(match.groups())
return True
def _AddVulnerability(self, file_name, line):
match = (
re.match('^Vulnerable: (.*)', line)
or re.match('^Vulnerable$', line)
or re.match('^([^:]+): Vulnerable$', line)
)
if match:
self.vulnerabilities[file_name] = ':'.join(match.groups())
return True
def _AddNotAffected(self, file_name, line):
match = re.match('^Not affected$', line)
if match:
self.notaffecteds.add(file_name)
return True
@property
def asdict(self) -> Dict[str, str]:
"""Returns the parsed CPU vulnerabilities as a dict."""
ret = {}
if self.mitigations:
ret['mitigations'] = ','.join(sorted(self.mitigations))
for key, value in self.mitigations.items():
ret[f'mitigation_{key}'] = value
if self.vulnerabilities:
ret['vulnerabilities'] = ','.join(sorted(self.vulnerabilities))
for key, value in self.vulnerabilities.items():
ret[f'vulnerability_{key}'] = value
if self.unknowns:
ret['unknowns'] = ','.join(self.unknowns)
for key, value in self.unknowns.items():
ret[f'unknown_{key}'] = value
if self.notaffecteds:
ret['notaffecteds'] = ','.join(sorted(self.notaffecteds))
return ret
class KernelRelease:
"""Holds the contents of the linux kernel version returned from uname -r."""
def __init__(self, uname: str):
"""KernelVersion Constructor.
Args:
uname: A string in the format of "uname -r" command
"""
# example format would be: "4.5.0-96-generic"
# or "3.10.0-514.26.2.el7.x86_64" for centos
# major.minor.Rest
# in this example, major = 4, minor = 5
self.name = uname
major_string, minor_string, _ = uname.split('.', 2)
self.major = int(major_string)
self.minor = int(minor_string)
def AtLeast(self, major, minor):
"""Check If the kernel version meets a minimum bar.
The kernel version needs to be at least as high as the major.minor
specified in args.
Args:
major: The major number to test, as an integer
minor: The minor number to test, as an integer
Returns:
True if the kernel version is at least as high as major.minor,
False otherwise
"""
if self.major < major:
return False
if self.major > major:
return True
return self.minor >= minor
def __repr__(self) -> str:
return self.name
class BaseLinuxMixin(os_mixin.BaseOsMixin):
"""Class that holds Linux related VM methods and attributes."""
# If multiple ssh calls are made in parallel using -t it will mess
# the stty settings up and the terminal will become very hard to use.
# Serializing calls to ssh with the -t option fixes the problem.
_pseudo_tty_lock = threading.Lock()
# this command might change depending on the OS, but most linux distributions
# can use the following command
INIT_RAM_FS_CMD = 'sudo update-initramfs -u'
# regex to get the network devices from "ip link show"
_IP_LINK_RE_DEVICE_MTU = re.compile(
r'^\d+: (?P<device_name>\S+):.*mtu (?P<mtu>\d+)'
)
# device prefixes to ignore from "ip link show"
# TODO(spencerkim): Record ib device metadata.
_IGNORE_NETWORK_DEVICE_PREFIXES = ('lo', 'docker', 'ib')
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# N.B. If you override ssh_port you must override remote_access_ports and
# primary_remote_access_port.
self.ssh_port = DEFAULT_SSH_PORT
self.remote_access_ports = [self.ssh_port]
self.primary_remote_access_port = self.ssh_port
self.has_private_key = False
self.ssh_external_time = None
self.ssh_internal_time = None
self._remote_command_script_upload_lock = threading.Lock()
self._has_remote_command_script = False
self._needs_reboot = False
self._lscpu_cache = None
self._partition_table = {}
self._proccpu_cache = None
self._smp_affinity_script = None
self.name: str
self._os_info: str | None = None
self._kernel_release: KernelRelease | None = None
self._cpu_arch: str | None = None
self._kernel_command_line: str | None = None
self._network_device_mtus = None
def _Suspend(self):
"""Suspends a VM."""
raise NotImplementedError()
def _Resume(self):
"""Resumes a VM."""
raise NotImplementedError()
def _BeforeSuspend(self):
pass
def _CreateVmTmpDir(self):
self.RemoteCommand('mkdir -p %s' % vm_util.VM_TMP_DIR)
self.RemoteCommand('sudo chmod 755 %s' % vm_util.VM_TMP_DIR)
def _SetTransparentHugepages(self):
"""Sets transparent hugepages based on --enable_transparent_hugepages.
If the flag is unset (None), this is a nop.
"""
if FLAGS.enable_transparent_hugepages is None:
return
setting = 'always' if FLAGS.enable_transparent_hugepages else 'never'
self.RemoteCommand(
'echo %s | sudo tee /sys/kernel/mm/transparent_hugepage/enabled'
% setting
)
self.os_metadata['transparent_hugepage'] = setting
def _DisableCstates(self):
"""Disable cstates that either match or deeper than the given cstate."""
cstate = _DISABLE_CSTATE_BY_NAME_AND_DEEPER.value
if not cstate:
return
cstates = self._GetOrderedCstates()
if not cstates:
raise ValueError(
'No cstates found, the system does not support disabling cstates'
)
logging.info('Available cstates listed in order: %s', cstates)
if cstate not in cstates:
raise ValueError(f'Requested cstate {cstate} is not present in {cstates}')
num_cpus = self.num_cpus or self._GetNumCpus()
start_index = cstates.index(cstate)
disabled_cstates = []
for index in range(start_index, len(cstates)):
for cpu_id in range(num_cpus):
config_path = (
f'{_SYSFS_CPU_PATH}/cpu{cpu_id}/cpuidle/state{index}/disable'
)
self.RemoteCommand(f'echo 1 | sudo tee {config_path}')
disabled_cstates.append(cstates[index])
self.os_metadata['disabled_cstates'] = ','.join(disabled_cstates)
def _GetOrderedCstates(self) -> list[str] | None:
"""Returns the ordered cstates by querying the sysfs cpuidle path.
The ordering is obtained by the alphabetical wildcard expansion.
"""
query_paths = f'{_SYSFS_CPU_PATH}/cpu0/cpuidle/state*/name'
if not self._RemoteFileExists(query_paths):
return None
out, _ = self.RemoteCommand(f'cat {query_paths}')
return list(filter(None, out.split('\n')))
def _SetupRobustCommand(self):
"""Sets up the RobustRemoteCommand tooling.
This includes installing python3 and pushing scripts required by
RobustRemoteCommand to this VM. There is a check to skip if previously
installed.
"""
with self._remote_command_script_upload_lock:
if not self._has_remote_command_script:
# Python is needed for RobustRemoteCommands
self.Install('python')
for f in (EXECUTE_COMMAND, WAIT_FOR_COMMAND):
remote_path = os.path.join(vm_util.VM_TMP_DIR, os.path.basename(f))
if os.path.basename(remote_path):
self.RemoteCommand('sudo rm -f ' + remote_path)
self.PushDataFile(f, remote_path)
self._has_remote_command_script = True
def RobustRemoteCommand(
self,
command: str,
timeout: float | None = None,
ignore_failure: bool = False,
) -> Tuple[str, str]:
"""Runs a command on the VM in a more robust way than RemoteCommand.
This is used for long-running commands that might experience network issues
that would normally interrupt a RemoteCommand and fail to provide results.
Executes a command via a pair of scripts on the VM:
* EXECUTE_COMMAND, which runs 'command' in a nohupped background process.
* WAIT_FOR_COMMAND, which first waits on confirmation that EXECUTE_COMMAND
has acquired an exclusive lock on a file with the command's status. This
is done by waiting for the existence of a file written by EXECUTE_COMMAND
once it successfully acquires an exclusive lock. Once confirmed,
WAIT_COMMAND waits to acquire the file lock held by EXECUTE_COMMAND until
'command' completes, then returns with the stdout, stderr, and exit status
of 'command'.
Temporary SSH failures (where ssh returns a 255) while waiting for the
command to complete will be tolerated and safely retried. However, if
remote command actually returns 255, SSH will return 1 instead to bypass
retry behavior.
Args:
command: The command to run.
timeout: The timeout for the command in seconds.
ignore_failure: Ignore any failure if set to true.
Returns:
A tuple of stdout, stderr from running the command.
Raises:
RemoteCommandError: If there was a problem establishing the connection, or
the command fails.
"""
self._SetupRobustCommand()
logger.info(
'Running RobustRemoteCommand on %s: %s',
self.name,
command,
stacklevel=2,
)
execute_path = os.path.join(
vm_util.VM_TMP_DIR, os.path.basename(EXECUTE_COMMAND)
)
wait_path = os.path.join(
vm_util.VM_TMP_DIR, os.path.basename(WAIT_FOR_COMMAND)
)
uid = uuid.uuid4()
file_base = os.path.join(vm_util.VM_TMP_DIR, 'cmd%s' % uid)
wrapper_log = file_base + '.log'
stdout_file = file_base + '.stdout'
stderr_file = file_base + '.stderr'
status_file = file_base + '.status'
exclusive_file = file_base + '.exclusive'
if not isinstance(command, str):
command = ' '.join(command)
start_command = ['nohup', 'python3', execute_path,
'--stdout', stdout_file,
'--stderr', stderr_file,
'--status', status_file,
'--exclusive', exclusive_file,
'--command', pipes.quote(command)] # pyformat: disable
if timeout:
start_command.extend(['--timeout', str(timeout)])
start_command = '%s 1> %s 2>&1 &' % (' '.join(start_command), wrapper_log)
self.RemoteCommand(start_command, stack_level=2)
def _WaitForCommand():
wait_command = ['python3', wait_path,
'--status', status_file,
'--exclusive', exclusive_file] # pyformat: disable
stdout = ''
while 'Command finished.' not in stdout:
logging.info('Waiting for original cmd: %s', command, stacklevel=3)
stdout, _ = self.RemoteCommand(
' '.join(wait_command),
timeout=1800,
should_pre_log=False,
stack_level=3,
)
wait_command.extend([
'--stdout', stdout_file,
'--stderr', stderr_file,
'--delete',
]) # pyformat: disable
logging.info(
'Finished waiting, printing stdout & stderr for cmd: %s',
command,
stacklevel=3,
)
return self.RemoteCommand(
' '.join(wait_command),
ignore_failure=ignore_failure,
should_pre_log=False,
stack_level=3,
)
try:
return _WaitForCommand()
except errors.VirtualMachine.RemoteCommandError:
# In case the error was with the wrapper script itself, print the log.
stdout, _ = self.RemoteCommand('cat %s' % wrapper_log, stack_level=2)
if stdout.strip():
logging.warning(
'Exception during RobustRemoteCommand. Wrapper script log:\n%s',
stdout,
)
raise
def SetupRemoteFirewall(self):
"""Sets up IP table configurations on the VM."""
self.RemoteHostCommand('sudo iptables -A INPUT -j ACCEPT')
self.RemoteHostCommand('sudo iptables -A OUTPUT -j ACCEPT')
def SetupProxy(self):
"""Sets up proxy configuration variables for the cloud environment."""
env_file = '/etc/environment'
commands = []
if FLAGS.http_proxy:
commands.append(
"echo 'http_proxy=%s' | sudo tee -a %s" % (FLAGS.http_proxy, env_file)
)
if FLAGS.https_proxy:
commands.append(
"echo 'https_proxy=%s' | sudo tee -a %s"
% (FLAGS.https_proxy, env_file)
)
if FLAGS.ftp_proxy:
commands.append(
"echo 'ftp_proxy=%s' | sudo tee -a %s" % (FLAGS.ftp_proxy, env_file)
)
if commands:
self.RemoteCommand(';'.join(commands))
def SetupPackageManager(self):
"""Specific Linux flavors should override this."""
pass
def PrepareVMEnvironment(self):
super().PrepareVMEnvironment()
self._SetNumCpus()
self.SetupProxy()
self._CreateVmTmpDir()
self._SetTransparentHugepages()
self._DisableCstates()
if FLAGS.setup_remote_firewall:
self.SetupRemoteFirewall()
if self.install_packages:
self._CreateInstallDir()
if self.is_static:
self.SnapshotPackages()
# TODO(user): Only setup if necessary.
# Call SetupPackageManager lazily from HasPackage/InstallPackages like
# ShouldDownloadPreprovisionedData sets up object storage CLIs.
self.SetupPackageManager()
self.SetFiles()
self.DoSysctls()
self._DoAppendKernelCommandLine()
self.ModifyKernelModules()
self.DoConfigureNetworkForBBR()
self.DoConfigureTCPWindow()
self.UpdateEnvironmentPath()
self._DisableCpus()
self._RebootIfNecessary()
self.BurnCpu()
self.FillDisk()
def _CreateInstallDir(self):
self.RemoteCommand(
('sudo mkdir -p {0}; sudo chmod a+rwxt {0}').format(
linux_packages.INSTALL_DIR
)
)
# LinuxMixins do not implement _Start or _Stop
def _Start(self):
"""Starts the VM."""
raise NotImplementedError()
def _Stop(self):
"""Stops the VM."""
raise NotImplementedError()
def SetFiles(self):
"""Apply --set_files to the VM."""
for pair in FLAGS.set_files:
path, value = pair.split('=')
self.RemoteCommand('echo "%s" | sudo tee %s' % (value, path))
def _DisableCpus(self):
"""Apply num_disable_cpus to the VM.
Raises:
ValueError: if num_disable_cpus is outside of (0 ... num_cpus-1)
inclusive
"""
if not FLAGS.num_disable_cpus:
return
self.num_disable_cpus = FLAGS.num_disable_cpus
if self.num_disable_cpus <= 0 or self.num_disable_cpus >= self.num_cpus:
raise ValueError(
'num_disable_cpus must be between 1 and '
'(num_cpus - 1) inclusive. '
'num_disable_cpus: %i, num_cpus: %i'
% (self.num_disable_cpus, self.num_cpus)
)
# We can't disable cpu 0, starting from the last cpu in /proc/cpuinfo.
# On multiprocessor systems, we also attempt to disable cpus on each
# physical processor based on "physical id" in order to keep a similar
# number of cpus on each physical processor.
# In addition, for each cpu we disable, we will look for cpu with same
# "core id" in order to disable vcpu pairs.
cpus = copy.deepcopy(self.CheckProcCpu().mappings)
cpu_mapping = collections.defaultdict(list)
for cpu, info in cpus.items():
numa = info.get('physical id')
cpu_mapping[int(numa)].append((cpu, int(info.get('core id'))))
# Sort cpus based on 'core id' on each numa node
for numa in cpu_mapping:
cpu_mapping[numa] = sorted(
cpu_mapping[numa], key=lambda cpu_info: (cpu_info[1], cpu_info[0])
)
def _GetNextCPUToDisable(num_disable_cpus):
"""Get the next CPU id to disable."""
numa_nodes = list(cpu_mapping)
while num_disable_cpus:
for numa in sorted(numa_nodes, reverse=True):
cpu_id, _ = cpu_mapping[numa].pop()
num_disable_cpus -= 1
yield cpu_id
if not num_disable_cpus:
break
for cpu_id in _GetNextCPUToDisable(self.num_disable_cpus):
self.RemoteCommand(
f'sudo bash -c "echo 0 > /sys/devices/system/cpu/cpu{cpu_id}/online"'
)
self._proccpu_cache = None
self._lscpu_cache = None
def UpdateEnvironmentPath(self):
"""Specific Linux flavors should override this."""
pass
def FillDisk(self):
"""Fills the primary scratch disk with a zeros file."""
if FLAGS.disk_fill_size:
out_file = posixpath.join(self.scratch_disks[0].mount_point, 'fill_file')
self.RobustRemoteCommand(
'dd if=/dev/zero of={out_file} bs=1G count={fill_size}'.format(
out_file=out_file, fill_size=FLAGS.disk_fill_size
)
)
def _ApplySysctlPersistent(self, sysctl_params):
"""Apply "key=value" pairs to /etc/sysctl.conf and load via sysctl -p.
These values should remain persistent across future reboots.
Args:
sysctl_params: dict - the keys and values to write
"""
if not sysctl_params:
return
for key, value in sysctl_params.items():
self.RemoteCommand(
'sudo bash -c \'echo "%s=%s" >> /etc/sysctl.conf\'' % (key, value)
)
# See https://www.golinuxcloud.com/sysctl-reload-without-reboot/
self.RemoteCommand('sudo sysctl -p')
def ApplySysctlPersistent(self, sysctl_params, should_reboot=False):
"""Apply "key=value" pairs to /etc/sysctl.conf and load via sysctl -p.
These values should remain persistent across future reboots.
Args:
sysctl_params: dict - the keys and values to write
should_reboot: bool - whether to reboot after applying sysctl changes
"""
self._ApplySysctlPersistent(sysctl_params)
if should_reboot or FLAGS.reboot_after_changing_sysctl:
self.Reboot()
def DoSysctls(self):
"""Apply --sysctl to the VM.
The Sysctl pairs are written persistently so that if a reboot
occurs, the flags are not lost.
"""
sysctl_params = {}
for pair in FLAGS.sysctl:
key, value = pair.split('=')
sysctl_params[key] = value
self._ApplySysctlPersistent(sysctl_params)
def DoConfigureNetworkForBBR(self):
"""Apply --network_enable_BBR to the VM."""
if not FLAGS.network_enable_BBR:
return
if not self.kernel_release.AtLeast(4, 9):
raise flags.ValidationError(
'BBR requires a linux image with kernel 4.9 or newer'
)
# if the current congestion control mechanism is already BBR
# then nothing needs to be done (avoid unnecessary reboot)
if self.TcpCongestionControl() == 'bbr':
return
self._ApplySysctlPersistent({
'net.core.default_qdisc': 'fq',
'net.ipv4.tcp_congestion_control': 'bbr',
})
def DoConfigureTCPWindow(self):
"""Change TCP window parameters in sysctl."""
possible_tcp_flags = [
FLAGS.tcp_max_receive_buffer,
FLAGS.tcp_max_send_buffer,
_TCP_MAX_NOTSENT_BYTES.value,
FLAGS.rmem_max,
FLAGS.wmem_max,
]
# Return if none of these flags are set
if all(x is None for x in possible_tcp_flags):
return
# Get current values from VM
stdout, _ = self.RemoteCommand('cat /proc/sys/net/ipv4/tcp_rmem')
rmem_values = stdout.split()
stdout, _ = self.RemoteCommand('cat /proc/sys/net/ipv4/tcp_wmem')
wmem_values = stdout.split()
stdout, _ = self.RemoteCommand('cat /proc/sys/net/ipv4/tcp_notsent_lowat')
notsent_lowat_values = stdout.split()
stdout, _ = self.RemoteCommand('cat /proc/sys/net/core/rmem_max')
rmem_max = int(stdout)
stdout, _ = self.RemoteCommand('cat /proc/sys/net/core/wmem_max')
wmem_max = int(stdout)
# third number is max receive/send
max_receive = rmem_values[2]
max_send = wmem_values[2]
max_not_sent = notsent_lowat_values[0]
logging.info('notsent[0]: %s', notsent_lowat_values[0])
# if flags are set, override current values from vm
if FLAGS.tcp_max_receive_buffer:
max_receive = FLAGS.tcp_max_receive_buffer
if FLAGS.tcp_max_send_buffer:
max_send = FLAGS.tcp_max_send_buffer
if _TCP_MAX_NOTSENT_BYTES.value:
max_not_sent = _TCP_MAX_NOTSENT_BYTES.value
if FLAGS.rmem_max:
rmem_max = FLAGS.rmem_max
if FLAGS.wmem_max:
wmem_max = FLAGS.wmem_max
# Add values to metadata
self.os_metadata['tcp_max_receive_buffer'] = max_receive
self.os_metadata['tcp_max_send_buffer'] = max_send
self.os_metadata['tcp_max_notsent_bytes'] = max_not_sent
self.os_metadata['rmem_max'] = rmem_max
self.os_metadata['wmem_max'] = wmem_max
rmem_string = '{} {} {}'.format(rmem_values[0], rmem_values[1], max_receive)
wmem_string = '{} {} {}'.format(wmem_values[0], wmem_values[1], max_send)
logging.info('rmem_string: ' + rmem_string + ' wmem_string: ' + wmem_string)
not_sent_string = '{}'.format(max_not_sent)
self._ApplySysctlPersistent({
'net.ipv4.tcp_rmem': rmem_string,
'net.ipv4.tcp_wmem': wmem_string,
'net.ipv4.tcp_notsent_lowat': not_sent_string,
'net.core.rmem_max': rmem_max,
'net.core.wmem_max': wmem_max,
})
def _RebootIfNecessary(self):
"""Will reboot the VM if self._needs_reboot has been set."""
if self._needs_reboot:
self.Reboot()
self._needs_reboot = False
def TcpCongestionControl(self):
"""Return the congestion control used for tcp."""
try:
resp, _ = self.RemoteCommand(
'cat /proc/sys/net/ipv4/tcp_congestion_control'
)
return resp.rstrip('\n')
except errors.VirtualMachine.RemoteCommandError:
return 'unknown'
def GetCPUVersion(self):
"""Get the CPU version of the VM.
Refer to flag definition '--required_cpu_version' in virtual_machine.py for
more details.
Returns:
guest_arch: str.
"""
proccpu_results = self.CheckProcCpu(check_cache=False).GetValues()
if 'vendor_id' in proccpu_results:
vendor = proccpu_results.get('vendor_id', 'UnknownVendor')
family = proccpu_results.get('cpu family', 'UnknownFamily')
model = proccpu_results.get('model', 'UnknownModel')
stepping = proccpu_results.get('stepping', 'UnknownStepping')
guest_arch = f'{vendor}_{family}_{model}_{stepping}'
else:
implementer = proccpu_results.get('CPU implementer', 'UnknownVendor')
arch = proccpu_results.get('CPU architecture', 'UnknownArchitecture')
variant = proccpu_results.get('CPU variant', 'UnknownVariant')
part = proccpu_results.get('CPU part', 'UnknownPart')
guest_arch = f'{implementer}_{arch}_{variant}_{part}'
return guest_arch
def CheckUlimit(self) -> 'UlimitResults':
"""Returns a UlimitResults from the host VM.
Do not cache these results, because many benchmarks change them.
The value can be different before and after runs.
"""
ulimit, _ = self.RemoteCommand('ulimit -a')
self._ulimit_cache = UlimitResults(ulimit)
return self._ulimit_cache
def CheckLsCpu(self):
"""Returns a LsCpuResults from the host VM."""
if not self._lscpu_cache:
lscpu, _ = self.RemoteCommand('lscpu')
self._lscpu_cache = LsCpuResults(lscpu)
return self._lscpu_cache
def CheckProcCpu(self, check_cache=True):
"""Returns a ProcCpuResults from the host VM."""
if not self._proccpu_cache or not check_cache:
proccpu, _ = self.RemoteCommand('cat /proc/cpuinfo')
self._proccpu_cache = ProcCpuResults(proccpu)
return self._proccpu_cache
def GetOsInfo(self) -> str:
"""Returns information regarding OS type and version."""
stdout, _ = self.RemoteCommand('grep PRETTY_NAME /etc/os-release')
return regex_util.ExtractGroup(OS_PRETTY_NAME_REGEXP, stdout)
@property
def os_info(self) -> str:
"""Get distribution-specific information."""
if not self._os_info:
self._os_info = self.GetOsInfo()
return self._os_info
@property
def kernel_release(self) -> KernelRelease:
"""Return kernel release number."""
if not self._kernel_release:
self._kernel_release = KernelRelease(
self.RemoteCommand('uname -r')[0].strip()
)
return self._kernel_release
@property
def kernel_command_line(self) -> str:
"""Return the kernel command line."""
if not self._kernel_command_line:
self._kernel_command_line = self.RemoteCommand('cat /proc/cmdline')[
0
].strip()
return self._kernel_command_line
@property
def cpu_arch(self) -> str:
"""Returns the CPU architecture of the VM."""
if not self._cpu_arch:
self._cpu_arch = self.RemoteCommand('uname -m')[0].strip()
return self._cpu_arch
@property
def partition_table(self) -> Dict[str, int]:
"""Return partition table information."""
if not self._partition_table:
cmd = 'sudo fdisk -l'
partition_tables = self.RemoteCommand(cmd)[0]
try:
self._partition_table = {
dev: int(size)
for (dev, size) in regex_util.ExtractAllMatches(
r'Disk\s*(.*):[\s\w\.]*,\s(\d*)\sbytes', partition_tables
)
}
except regex_util.NoMatchError:
# TODO(user): Use alternative methods to retrieve partition table.
logging.warning('Partition table not found with "%s".', cmd)
return {}
return self._partition_table
@vm_util.Retry(log_errors=False, poll_interval=1)
def WaitForBootCompletion(self):
"""Waits until the VM has booted."""
# Test for listening on the port first, because this will happen strictly
# first.
if (
FLAGS.cluster_boot_test_port_listening
and self.port_listening_time is None
):
self.TestConnectRemoteAccessPort()
self.port_listening_time = time.time()
boot_methods = self.boot_completion_ip_subset
if boot_methods == virtual_machine.BootCompletionIpSubset.DEFAULT:
# By default let GetConnectionIp decide which IP to use for SSH.
# Omitting ip_address falls back to GetConnectionIp in RemoteHostCommand.
self._WaitForSSH()
# We don't know if GetConnectionIP returned an internal or external IP,
# so we can set neither self.ssh_external_time nor self.ssh_internal_time.
# It will still set self.bootable_time below.
elif boot_methods == virtual_machine.BootCompletionIpSubset.EXTERNAL:
self._WaitForSshExternal()
elif boot_methods == virtual_machine.BootCompletionIpSubset.INTERNAL:
self._WaitForSshInternal()
elif boot_methods == virtual_machine.BootCompletionIpSubset.BOTH:
connect_threads = [
(self._WaitForSshExternal, [], {}),
(self._WaitForSshInternal, [], {}),
]
background_tasks.RunParallelThreads(connect_threads, len(connect_threads))
else:
raise ValueError(
'Unknown --boot_completion_ip_subset: '
+ self.boot_completion_ip_subset
)
if self.bootable_time is None:
self.bootable_time = time.time()
def _WaitForSshExternal(self):
assert self.boot_completion_ip_subset in (
virtual_machine.BootCompletionIpSubset.EXTERNAL,
virtual_machine.BootCompletionIpSubset.BOTH,
)
self._WaitForSSH(self.ip_address)
self.ssh_external_time = time.time()
def _WaitForSshInternal(self):
assert self.boot_completion_ip_subset in (
virtual_machine.BootCompletionIpSubset.INTERNAL,
virtual_machine.BootCompletionIpSubset.BOTH,
)
self._WaitForSSH(self.internal_ip)
self.ssh_internal_time = time.time()
@vm_util.Retry(log_errors=False, poll_interval=1)
def _WaitForSSH(self, ip_address: Union[str, None] = None):
"""Waits until the VM is ready."""
# Always wait for remote host command to succeed, because it is necessary to
# run benchmarks
resp, _ = self.RemoteHostCommand(
'hostname', retries=1, ip_address=ip_address
)
if self.hostname is None:
self.hostname = resp[:-1]
def RecordAdditionalMetadata(self):
"""After the VM has been prepared, store metadata about the VM."""
super().RecordAdditionalMetadata()
if not self.bootable_time:
logging.warning(
'RecordAdditionalMetadata: skipping additional metadata'
' capture due to an unreachable VM.'
)
return
self.tcp_congestion_control = self.TcpCongestionControl()
lscpu_results = self.CheckLsCpu()
self.numa_node_count = lscpu_results.numa_node_count
self.os_metadata['threads_per_core'] = lscpu_results.threads_per_core
self.os_metadata['os_info'] = self.os_info
self.os_metadata['kernel_release'] = str(self.kernel_release)
self.os_metadata['cpu_arch'] = self.cpu_arch
self.os_metadata.update(self.partition_table)
if FLAGS.append_kernel_command_line:
self.os_metadata['kernel_command_line'] = self.kernel_command_line
self.os_metadata['append_kernel_command_line'] = (
FLAGS.append_kernel_command_line
)
# TODO(pclay): consider publishing full lsmod as a sample. It's probably too
# spammy for metadata
if _KERNEL_MODULES_TO_ADD.value:
self.os_metadata['added_kernel_modules'] = ','.join(
_KERNEL_MODULES_TO_ADD.value
)
if _KERNEL_MODULES_TO_REMOVE.value:
self.os_metadata['removed_kernel_modules'] = ','.join(
_KERNEL_MODULES_TO_REMOVE.value
)
devices = self._get_network_device_mtus()
all_mtus = set(devices.values())
if len(all_mtus) > 1:
logging.warning(
'MTU must only have 1 unique MTU value not: %s. MTU now a '
'concatenation of values.',
all_mtus,
)
self.os_metadata['mtu'] = '-'.join(list(all_mtus))
elif not all_mtus:
logging.warning('No unique network devices')
else:
self.os_metadata['mtu'] = list(all_mtus)[0]
def _get_network_device_mtus(self) -> Dict[str, str]:
"""Returns network device names and their MTUs."""
if not self._network_device_mtus:
stdout, _ = self.RemoteCommand('PATH="${PATH}":/usr/sbin ip link show up')
self._network_device_mtus = {}
for line in stdout.splitlines():
m = self._IP_LINK_RE_DEVICE_MTU.match(line)
if m:
device_name = m['device_name']
if not any(
device_name.startswith(prefix)
for prefix in self._IGNORE_NETWORK_DEVICE_PREFIXES
):
self._network_device_mtus[device_name] = m['mtu']
return self._network_device_mtus
@vm_util.Retry(log_errors=False, poll_interval=1)
def VMLastBootTime(self):
"""Returns the time the VM was last rebooted as reported by the VM.
See
https://unix.stackexchange.com/questions/165002/how-to-reliably-get-timestamp-at-which-the-system-booted.
"""
stdout, _ = self.RemoteHostCommand('stat -c %z /proc/', retries=1)
if stdout.startswith('1970-01-01'):
# Fix for ARM returning epochtime
date_fmt = '+%Y-%m-%d %H:%M:%S.%s %z'
date_cmd = "grep btime /proc/stat | awk '{print $2}'"
stdout, _ = self.RemoteHostCommand(f'date "{date_fmt}" -d@$({date_cmd})')
return stdout
def SnapshotPackages(self):
"""Grabs a snapshot of the currently installed packages."""
pass
def RestorePackages(self):
"""Restores the currently installed packages to those snapshotted."""
pass
def PackageCleanup(self):
"""Cleans up all installed packages.
Deletes the temp directory, restores packages, and uninstalls all
PerfKit packages.
"""
for package_name in self._installed_packages:
self.Uninstall(package_name)
self.RestorePackages()
self.RemoteCommand('sudo rm -rf %s' % linux_packages.INSTALL_DIR)
def GetPathToConfig(self, package_name):
"""Returns the path to the config file for PerfKit packages.
This function is mostly useful when config files locations
don't match across distributions (such as mysql). Packages don't
need to implement it if this is not the case.
"""
pass
def GetServiceName(self, package_name):
"""Returns the service name of a PerfKit package.
This function is mostly useful when service names don't
match across distributions (such as mongodb). Packages don't
need to implement it if this is not the case.
"""
pass
def IsMounted(self, mount_point: str, dev_path: str):
"""Returns whether the given mount point is mounted."""
# GCP disk uses a symlink for the device path, so we need to translate it
# to the actual device path. For other cloud providers, the device path
# should already be the actual device path and readlink will return the
# same path in the result.
stdout, _ = self.RemoteHostCommand(f'readlink -f {dev_path}')
device_path = stdout.strip()
stdout, _ = self.RemoteHostCommand(
f'mount | grep "{device_path} on {mount_point}" | wc -l'
)
return stdout and int(stdout) > 0
@vm_util.Retry()
def FormatDisk(self, device_path, disk_type=None):
"""Formats a disk attached to the VM."""
# Some images may automount one local disk, but we don't
# want to fail if this wasn't the case.
if disk.NFS == disk_type:
return
if disk.SMB == disk_type:
return
umount_cmd = '[[ -d /mnt ]] && sudo umount /mnt; '
# TODO(user): Allow custom disk formatting options.
if FLAGS.disk_fs_type == 'xfs':
block_size = FLAGS.disk_block_size or 512
fmt_cmd = 'sudo mkfs.xfs -f -i size={} {}'.format(block_size, device_path)
else:
block_size = FLAGS.disk_block_size or 4096
fmt_cmd = (
'sudo mke2fs -F -E lazy_itable_init=0,discard -O '
'^has_journal -t ext4 -b {} {}'.format(block_size, device_path)
)
self.os_metadata['disk_filesystem_type'] = FLAGS.disk_fs_type
self.os_metadata['disk_filesystem_blocksize'] = block_size
self.RemoteHostCommand(umount_cmd + fmt_cmd)
@vm_util.Retry(
timeout=vm_util.DEFAULT_TIMEOUT,
retryable_exceptions=(errors.VirtualMachine.RemoteCommandError,),
)
def MountDisk(
self,
device_path,
mount_path,
disk_type=None,
mount_options=disk.DEFAULT_MOUNT_OPTIONS,
fstab_options=disk.DEFAULT_FSTAB_OPTIONS,
):
"""Mounts a formatted disk in the VM."""
mount_options = '-o %s' % mount_options if mount_options else ''
if disk.NFS == disk_type:
mount_options = '-t nfs %s' % mount_options
fs_type = 'nfs'
elif disk.SMB == disk_type:
mount_options = '-t cifs %s' % mount_options
fs_type = 'smb'
else:
fs_type = FLAGS.disk_fs_type
fstab_options = fstab_options or ''
mnt_cmd = (
'sudo mkdir -p {mount_path};'
'sudo mount {mount_options} {device_path} {mount_path} && '
'sudo chown $USER:$USER {mount_path};'
).format(
mount_path=mount_path,
device_path=device_path,
mount_options=mount_options,
)
self.RemoteHostCommand(mnt_cmd)
# add to /etc/fstab to mount on reboot
mnt_cmd = (
'echo "{device_path} {mount_path} {fs_type} {fstab_options}" '
'| sudo tee -a /etc/fstab'
).format(
device_path=device_path,
mount_path=mount_path,
fs_type=fs_type,
fstab_options=fstab_options,
)
self.RemoteHostCommand(mnt_cmd)
def LogVmDebugInfo(self):
"""Logs the output of calling dmesg on the VM."""
if FLAGS.log_dmesg:
self.RemoteCommand('hostname && sudo dmesg')
def RemoteCopy(self, file_path, remote_path='', copy_to=True):
self.RemoteHostCopy(file_path, remote_path, copy_to)
def RemoteHostCopy(self, file_path, remote_path='', copy_to=True):
"""Copies a file to or from the VM.
Args:
file_path: Local path to file.
remote_path: Optional path of where to copy file on remote host.
copy_to: True to copy to vm, False to copy from vm.
Raises:
RemoteCommandError: If there was a problem copying the file.
"""
if vm_util.RunningOnWindows():
if ':' in file_path:
# scp doesn't like colons in paths.
file_path = file_path.split(':', 1)[1]
# Replace the last instance of '\' with '/' to make scp happy.
file_path = '/'.join(file_path.rsplit('\\', 1))
remote_ip = '[%s]' % self.GetConnectionIp()
remote_location = '%s@%s:%s' % (self.user_name, remote_ip, remote_path)
scp_cmd = ['scp', '-P', str(self.ssh_port), '-pr']
ssh_private_key = (
self.ssh_private_key if self.is_static else vm_util.GetPrivateKeyPath()
)
# Allow increasing the connection timeout since it can be useful.
scp_cmd.extend(
vm_util.GetSshOptions(
ssh_private_key, connect_timeout=FLAGS.scp_connect_timeout
)
)
simplified_cmd = ['scp']
if copy_to:
simplified_cmd.extend([file_path, remote_location])
scp_cmd.extend([file_path, remote_location])
else:
simplified_cmd.extend([remote_location, file_path])
scp_cmd.extend([remote_location, file_path])
logging.info(
'Copying file with simplified command: %s', ' '.join(simplified_cmd)
)
stdout, stderr, retcode = '', '', 1 # placate uninitialized variable checks
for _ in range(FLAGS.ssh_retries):
stdout, stderr, retcode = vm_util.IssueCommand(
scp_cmd, timeout=None, should_pre_log=False, raise_on_failure=False
)
# Retry on 255 because this indicates an SSH failure
if retcode != RETRYABLE_SSH_RETCODE:
break
# Recursive scp is generally not idempotent so retrying is not perfectly
# safe. However, if the copy target (ie. remote_path if copy_to is True)
# does not include a matching dirname as the source, then the scp _is_
# idempotent. Callers should try to set the copy target accordingly to
# avoid this pitfall.
#
# For example, `scp -r /tmp/foo_dir perfkit@host:some_dir/foo_dir` is NOT
# idempotent but `scp -r /tmp/foo_dir perfkit@host:some_dir` is.
#
# See:
# https://www.tsmean.com/articles/command-line-tools/scp-and-idempotency/
if retcode:
full_cmd = ' '.join(scp_cmd)
error_text = (
'Got non-zero return code (%s) executing %s\nSTDOUT: %sSTDERR: %s'
% (retcode, full_cmd, stdout, stderr)
)
raise errors.VirtualMachine.RemoteCommandError(error_text)
def RunCommand(
self,
command: str | list[str],
ignore_failure: bool = False,
should_pre_log: bool = True,
stack_level: int = 1,
timeout: float | None = None,
**kwargs: Any,
) -> tuple[str, str, int]:
"""Runs a command.
Additional args can be supplied & are passed to lower level functions but
aren't required.
Args:
command: A valid bash command in string or list form.
ignore_failure: Ignore any failure if set to true.
should_pre_log: Whether to print the command being run or not.
stack_level: Number of stack frames to skip & get an "interesting" caller,
for logging. 1 skips this function, 2 skips this & its caller, etc..
timeout: The time to wait in seconds for the command before exiting. None
means no timeout.
**kwargs: Additional command arguments.
Returns:
A tuple of stdout and stderr from running the command.
Raises:
RemoteCommandError: If there was a problem issuing the command.
"""
if not isinstance(command, str):
cmd_str = ' '.join(command)
else:
cmd_str = command
stack_level += 1
if 'raise_on_failure' in kwargs:
ignore_failure = not kwargs.pop('raise_on_failure')
if 'env' in kwargs:
env_vars = kwargs.pop('env')
for var_name, var_value in env_vars.items():
cmd_str = f'export {var_name}={var_value} && {cmd_str}'
return self.RemoteCommandWithReturnCode(
cmd_str,
ignore_failure=ignore_failure,
should_pre_log=should_pre_log,
stack_level=stack_level,
timeout=timeout,
**kwargs,
)
def RemoteCommand(self, *args, **kwargs) -> Tuple[str, str]:
"""Runs a command on the VM.
Args:
*args: Arguments passed directly to RemoteCommandWithReturnCode.
**kwargs: Keyword arguments passed directly to
RemoteCommandWithReturnCode.
Returns:
A tuple of stdout, stderr from running the command.
Raises:
RemoteCommandError: If there was a problem establishing the connection.
"""
kwargs = _IncrementStackLevel(**kwargs)
return self.RemoteCommandWithReturnCode(*args, **kwargs)[:2]
def RemoteCommandWithReturnCode(
self, *args, **kwargs
) -> Tuple[str, str, int]:
"""Runs a command on the VM.
Args:
*args: Arguments passed directly to RemoteHostCommandWithReturnCode.
**kwargs: Keyword arguments passed directly to
RemoteHostCommandWithReturnCode.
Returns:
A tuple of stdout, stderr, return_code from running the command.
Raises:
RemoteCommandError: If there was a problem establishing the connection.
"""
kwargs = _IncrementStackLevel(**kwargs)
return self.RemoteHostCommandWithReturnCode(*args, **kwargs)
def RemoteHostCommandWithReturnCode(
self,
command: str,
retries: int | None = None,
ignore_failure: bool = False,
login_shell: bool = False,
disable_tty_lock: bool = False,
timeout: float | None = None,
ip_address: str | None = None,
should_pre_log: bool = True,
stack_level: int = 1,
) -> Tuple[str, str, int]:
"""Runs a command on the VM.
This is guaranteed to run on the host VM, whereas RemoteCommand might run
within i.e. a container in the host VM.
Args:
command: A valid bash command.
retries: The maximum number of times RemoteCommand should retry SSHing
when it receives a 255 return code. If None, it defaults to the value of
the flag ssh_retries.
ignore_failure: Ignore any failure if set to true.
login_shell: Run command in a login shell.
disable_tty_lock: Disables TTY lock. Multiple commands will try to take
control of the terminal.
timeout: The timeout for IssueCommand.
ip_address: The ip address to use to connect to host. If None, uses
self.GetConnectionIp()
should_pre_log: Whether to output a "Running command" log statement.
stack_level: Number of stack frames to skip & get an "interesting" caller,
for logging. 1 skips this function, 2 skips this & its caller, etc..
Returns:
A tuple of stdout, stderr, return_code from running the command.
Raises:
RemoteCommandError: If there was a problem establishing the connection.
"""
stack_level += 1
if retries is None:
retries = FLAGS.ssh_retries
if vm_util.RunningOnWindows():
# Multi-line commands passed to ssh won't work on Windows unless the
# newlines are escaped.
command = command.replace('\n', '\\n')
if ip_address is None:
ip_address = self.GetConnectionIp()
user_host = '%s@%s' % (self.user_name, ip_address)
ssh_cmd = ['ssh', '-A', '-p', str(self.ssh_port), user_host]
ssh_private_key = (
self.ssh_private_key if self.is_static else vm_util.GetPrivateKeyPath()
)
ssh_cmd.extend(vm_util.GetSshOptions(ssh_private_key))
# TODO(yuyanting): Revisit implementing with "-o ProxyJump".
# Current proxy implementation relies on ssh_config file being generated,
# which happens at the end of the Provision stage. This causes circular
# depencency for regular VM (and thus only used in cluster provisioned VMs).
if self.proxy_jump:
ssh_cmd = [
'ssh',
'-F',
os.path.join(vm_util.GetTempDir(), 'ssh_config'),
self.name,
]
if should_pre_log:
logger.info(
'Running on %s via ssh: %s',
self.name,
command,
stacklevel=stack_level,
)
stdout, stderr, retcode = '', '', 1 # placate uninitialized variable checks
try:
if login_shell:
ssh_cmd.extend(['-t', '-t', 'bash -l -c "%s"' % command])
if not disable_tty_lock:
self._pseudo_tty_lock.acquire()
else:
ssh_cmd.append(command)
for _ in range(retries):
stdout, stderr, retcode = vm_util.IssueCommand(
ssh_cmd,
timeout=timeout,
should_pre_log=False,
raise_on_failure=False,
stack_level=stack_level,
)
# Retry on 255 because this indicates an SSH failure
if retcode != RETRYABLE_SSH_RETCODE:
break
finally:
if login_shell and not disable_tty_lock:
self._pseudo_tty_lock.release()
if retcode:
full_cmd = ' '.join(ssh_cmd)
error_text = (
'Got non-zero return code (%s) executing %s\n'
'Full command: %s\nSTDOUT: %sSTDERR: %s'
% (retcode, command, full_cmd, stdout, stderr)
)
if not ignore_failure:
raise errors.VirtualMachine.RemoteCommandError(error_text)
return (stdout, stderr, retcode)
def RemoteHostCommand(self, *args, **kwargs) -> Tuple[str, str]:
"""Runs a command on the VM.
This is guaranteed to run on the host VM, whereas RemoteCommand might run
within i.e. a container in the host VM.
Args:
*args: Arguments passed directly to RemoteHostCommandWithReturnCode.
**kwargs: Keyword arguments passed directly to
RemoteHostCommandWithReturnCode.
Returns:
A tuple of stdout, stderr from running the command.
Raises:
RemoteCommandError: If there was a problem establishing the connection.
"""
kwargs = _IncrementStackLevel(**kwargs)
return self.RemoteHostCommandWithReturnCode(*args, **kwargs)[:2]
def _CheckRebootability(self):
if not self.IS_REBOOTABLE:
raise errors.VirtualMachine.VirtualMachineError(
"Trying to reboot a VM that isn't rebootable."
)
def _Reboot(self):
"""OS-specific implementation of reboot command."""
self._CheckRebootability()
self.RemoteCommand('sudo reboot', ignore_failure=True)
def _AfterReboot(self):
"""Performs any OS-specific setup on the VM following reboot.
This will be called after every call to Reboot().
"""
# redetect os metadata as it might have changed
self._os_info = None
self._kernel_release = None
self._kernel_command_line = None
self._lscpu_cache = None
self.RecordAdditionalMetadata()
if self.install_packages:
self._CreateInstallDir()
self._CreateVmTmpDir()
self._SetTransparentHugepages()
self._DisableCstates()
self._has_remote_command_script = False
self._DisableCpus()
def MoveFile(self, target, source_path, remote_path=''):
self.MoveHostFile(target, source_path, remote_path)
def MoveHostFile(self, target, source_path, remote_path=''):
"""Copies a file from one VM to a target VM.
Args:
target: The target BaseVirtualMachine object.
source_path: The location of the file on the REMOTE machine.
remote_path: The destination of the file on the TARGET machine, default is
the home directory.
"""
self.AuthenticateVm()
# TODO(user): For security we may want to include
# -o UserKnownHostsFile=/dev/null in the scp command
# however for the moment, this has happy side effects
# ie: the key is added to know known_hosts which allows
# OpenMPI to operate correctly.
remote_location = '%s@%s:%s' % (
target.user_name,
target.ip_address,
remote_path,
)
self.RemoteHostCommand(
'scp -P %s -o StrictHostKeyChecking=no -i %s %s %s'
% (target.ssh_port, REMOTE_KEY_PATH, source_path, remote_location)
)
def AuthenticateVm(self):
"""Authenticate a remote machine to access all peers."""
if not self.has_private_key:
if not self.is_static:
self.RemoteHostCopy(vm_util.GetPrivateKeyPath(), REMOTE_KEY_PATH)
elif self.ssh_private_key and FLAGS.copy_ssh_private_keys_into_static_vms:
logging.warning('Copying ssh private keys into static VMs')
self.RemoteHostCopy(self.ssh_private_key, REMOTE_KEY_PATH)
else:
logging.warning(
'No key sharing for static VMs with'
' --copy_ssh_private_keys_into_static_vms=False'
)
return
self.RemoteCommand(
'echo "Host *\n StrictHostKeyChecking no\n" > ~/.ssh/config'
)
self.RemoteCommand('chmod 600 ~/.ssh/config')
self.has_private_key = True
def TestAuthentication(self, peer):
"""Tests whether the VM can access its peer.
Raises:
AuthError: If the VM cannot access its peer.
"""
if not self.TryRemoteCommand('ssh %s hostname' % peer.internal_ip):
raise errors.VirtualMachine.AuthError(
'Authentication check failed. If you are running with Static VMs, '
'please make sure that %s can ssh into %s without supplying any '
'arguments except the ip address.' % (self, peer)
)
def CheckJavaVersion(self):
"""Check the version of java on remote machine.
Returns:
The version of Java installed on remote machine.
"""
version, _ = self.RemoteCommand(
"java -version 2>&1 >/dev/null | grep version | awk '{print $3}'"
)
return version[:-1]
def RemoveFile(self, filename):
"""Deletes a file on a remote machine.
Args:
filename: Path to the file to delete.
"""
self.RemoteCommand('sudo rm -rf %s' % filename)
def GetDeviceSizeFromPath(self, path):
"""Gets the size of the a drive that contains the path specified.
Args:
path: The function will return the amount of space on the file system that
contains this file name.
Returns:
The size in 1K blocks of the file system containing the file.
"""
df_command = "df -k -P %s | tail -n +2 | awk '{ print $2 }'" % path
stdout, _ = self.RemoteCommand(df_command)
return int(stdout)
def DropCaches(self):
"""Drops the VM's caches."""
drop_caches_command = 'sudo /sbin/sysctl vm.drop_caches=3'
self.RemoteCommand(drop_caches_command)
def _RemoteFileExists(self, file_path: str) -> bool:
"""Returns true if the file exists on the VM."""
stdout, _ = self.RemoteCommand(
f'ls {file_path} >> /dev/null 2>&1 || echo file_not_exist'
)
return not stdout
def GetCpusAllowedSet(self) -> set[int]:
r"""Returns the list of CPUs allowed for the current process.
Processes the output of the following files in order:
1. /sys/fs/cgroup/cpuset.cpus.effective
2. /proc/self/status
3. /proc/cpuinfo
Below are the example of their returns:
$ cat XX/cpuset.cpus.effective :
0-23
$ cat /proc/self/status | grep Cpus_allowed_list |cut -d: -f2
0-23
$ cat /proc/cpuinfo |sed -e 's/[[:blank:]]*//g' | grep ^processor
| cut -d: -f2 |tr '\n' ','
0,1,2,3
Returns:
A set of CPUs allowed for the current process.
"""
if self._RemoteFileExists('/sys/fs/cgroup/cpuset.cpus.effective'):
stdout, _ = self.RemoteCommand('cat /sys/fs/cgroup/cpuset.cpus.effective')
elif self._RemoteFileExists('/proc/self/status'):
stdout, _ = self.RemoteCommand(
'cat /proc/self/status | grep Cpus_allowed_list |cut -d: -f2'
)
elif self._RemoteFileExists('/proc/cpuinfo'):
cmd = 'cat /proc/cpuinfo '
cmd += r"| sed -n 's/processor\s*\:\s*\([0-9]*\)/\1/p'"
cmd += '| paste -sd,'
stdout, _ = self.RemoteCommand(cmd)
else:
raise ValueError(
'GetCpusAllowedSet failed, cannot read'
' /sys/fs/cgroup/cpuset.cpus.effective, /proc/self/status, or'
' /proc/cpuinfo.'
)
return ParseRangeList(stdout)
def _GetNumCpus(self):
"""Returns the number of logical CPUs on the VM.
If the flag `use_numcpu_multi_files` is true,
call GetCpusAllowedSet function to help calculate the number of CPUs.
Otherwise, extracts the value from `/proc/cpuinfo` file.
This method does not cache results (unlike "num_cpus").
"""
if FLAGS.use_numcpu_multi_files:
return len(self.GetCpusAllowedSet())
stdout, _ = self.RemoteCommand('cat /proc/cpuinfo | grep processor | wc -l')
return int(stdout)
def _GetTotalFreeMemoryKb(self):
"""Calculate amount of free memory in KB of the given vm.
Free memory is calculated as sum of free, cached, and buffers
as output from /proc/meminfo.
Returns:
free memory on the vm in KB
"""
stdout, _ = self.RemoteCommand("""
awk '
BEGIN {total =0}
/MemFree:/ {total += $2}
/Cached:/ {total += $2}
/Buffers:/ {total += $2}
END {printf "%d",total/1024}
' /proc/meminfo
""")
return int(stdout) * 1024
def _GetTotalMemoryKbFromCgroup(self):
"""Extracts the memory space in kibibyte (KiB) for containers.
Gets the memory capacity from
/sys/fs/cgroup/memory/<container>/memory.limit_in_bytes,
or /sys/fs/cgroup/memory/memory.limit_in_bytes.
Below are the example of their returns:
$ cat /sys/fs/cgroup/memory/container/memory.limit_in_bytes
1024
Returns:
The memory capacity in kibibyte (KiB).
Raises:
ValueError: If not found /proc/self/cgroup,
or /sys/fs/cgroup/memory/<container>/memory.limit_in_bytes,
or /sys/fs/cgroup/memory/memory.limit_in_bytes.
"""
if self._RemoteFileExists('/proc/self/cgroup'):
container_name, _ = self.RemoteCommand(
"grep memory /proc/self/cgroup |cut -d ':' -f 3 |sed -e 's:^/::'"
)
container_name = container_name.replace('\n', '')
else:
raise ValueError(
'_GetTotalMemoryKbFromCgroup failed, cannot read /proc/self/cgroup.'
)
if self._RemoteFileExists(
f'/sys/fs/cgroup/memory/{container_name}/memory.limit_in_bytes'
):
stdout, _ = self.RemoteCommand(
f'cat /sys/fs/cgroup/memory/{container_name}/memory.limit_in_bytes'
)
return int(stdout) // 1024
elif self._RemoteFileExists('/sys/fs/cgroup/memory/memory.limit_in_bytes'):
stdout, _ = self.RemoteCommand(
'cat /sys/fs/cgroup/memory/memory.limit_in_bytes'
)
return int(stdout) // 1024
raise ValueError(
'_GetTotalMemoryKbFromCgroup failed, cannot read '
' /sys/fs/cgroup/memory/%s/memory.limit_in_bytes or'
' /sys/fs/cgroup/memory/memory.limit_in_bytes' % container_name
)
def _GetTotalMemoryKb(self):
"""Returns the amount of physical memory on the VM in Kilobytes (KiB).
if the flag `use_cgroup_memory_limits` is true, return
the minimum of cgroup memory capacity and the VM capacity.
Otherwise, extracts the memory capacity using /proc/meminfo.
This method does not cache results (unlike "total_memory_kb").
"""
meminfo_command = "cat /proc/meminfo | grep MemTotal | awk '{print $2}'"
stdout, _ = self.RemoteCommand(meminfo_command)
meminfo_memory_kb = int(stdout)
if FLAGS.use_cgroup_memory_limits:
return min(self._GetTotalMemoryKbFromCgroup(), meminfo_memory_kb)
return meminfo_memory_kb
def _TestReachable(self, ip):
"""Returns True if the VM can reach the ip address and False otherwise."""
return self.TryRemoteCommand('ping -c 1 %s' % ip)
def SetupLocalDisks(self):
"""Performs Linux specific setup of local disks."""
local_disks = []
for d in self.scratch_disks:
if d.disk_type == disk.LOCAL and d.IsNvme():
if isinstance(d, disk.StripedDisk):
local_disks += d.disks
else:
local_disks.append(d)
if _ENABLE_NVME_INTERRUPT_COALEASING.value:
self._EnableInterruptCoaleasing(local_disks)
def _EnableInterruptCoaleasing(self, local_disks):
if not _ENABLE_NVME_INTERRUPT_COALEASING.value:
return
self.os_metadata['interrupt_coaleasing'] = True
self.InstallPackages('nvme-cli')
for d in local_disks:
path = d.GetDevicePath()
self.RemoteCommand(
f'sudo nvme --set-feature --feature-id=8 --value=0x101 {path}'
)
def hasStripedDiskDevice(self, dev_name: str) -> bool:
"""Checks if the striped disk device exists or not.
Args:
dev_name: The name of the device.
Returns:
True if the striped disk device exists.
"""
# Suppress the error as it's not a blocker to the test if the command fails.
# The command would pass if the stripped device exists so the exit code of
# the command would be 0.
_, _, return_code = self.RemoteHostCommandWithReturnCode(
f'sudo mdadm "{dev_name}"', ignore_failure=True
)
# Return True if the command succeeds, otherwise False.
return return_code == 0
def StripeDisks(self, devices, striped_device):
"""Raids disks together using mdadm.
Args:
devices: A list of device paths that should be striped together.
striped_device: The path to the device that will be created.
"""
self.Install('mdadm')
stripe_cmd = (
'yes | sudo mdadm --create %s --level=stripe --raid-devices=%s %s'
% (striped_device, len(devices), ' '.join(devices))
)
self.RemoteHostCommand(stripe_cmd)
# Save the RAID layout on the disk
self.RemoteHostCommand('sudo mkdir -p /etc/mdadm')
self.RemoteHostCommand('sudo touch /etc/mdadm/mdadm.conf')
cmd = 'sudo mdadm --detail --scan | ' + 'sudo tee -a /etc/mdadm/mdadm.conf'
self.RemoteHostCommand(cmd)
# Make the disk available during reboot for VMs running Debian based Linux
if self.OS_TYPE != os_types.RHEL8:
init_ram_fs_cmd = self.INIT_RAM_FS_CMD
self.RemoteHostCommand(init_ram_fs_cmd)
# Automatically mount the disk after reboot
cmd = (
"echo '/dev/md0 /mnt/md0 ext4 defaults,nofail"
",discard 0 0' | sudo tee -a /etc/fstab"
)
self.RemoteHostCommand(cmd)
def IsDiskFormatted(self, dev_name, num_partitions):
"""Checks if the disk is formatted.
Args:
dev_name: The name of the device.
num_partitions: The number of new partitions to create.
Returns:
True if the disk is already formatted with given number of partitions.
"""
# Check how many partition are already created for the given disk.
ret, _ = self.RemoteHostCommand(
f'ls /dev/disk/by-id/ | grep "{dev_name}-part" | wc -l'
)
return ret and int(ret) == num_partitions
def PartitionDisk(self, dev_name, dev_path, num_partitions, partition_size):
"""Partitions the disk into smaller pieces.
Args:
dev_name: The name of the device.
dev_path: The device path that should be partitioned.
num_partitions: The number of new partitions to create.
partition_size: The size of each partition. The last partition will use
the rest of the device space.
Returns:
A list of partition parths.
"""
# Install sfdisk from util-linux and partprobe from parted to
# partition disks and refresh partition table.
self.InstallPackages('util-linux')
self.InstallPackages('parted')
# Set the disk label to gpt.
self.RemoteHostCommand(f"echo 'label: gpt' | sudo sfdisk {dev_path}")
disks = []
# Create and append new partitions (except the last oen) to the disk
for part_id in range(num_partitions - 1):
self.RemoteHostCommand(
'echo ",%s,L" | sudo sfdisk %s -f --append'
% (partition_size, dev_path)
)
new_partition_name = f'{dev_name}-part{part_id+1}'
disks.append(new_partition_name)
# Use the rest space to create the last partition and append.
self.RemoteHostCommand(f'echo ",,L" | sudo sfdisk {dev_path} -f --append')
# Refresh the partition table.
self.RemoteHostCommand(f'sudo partprobe {dev_path}')
disks.append(f'{dev_name}-part{num_partitions}')
return disks
def BurnCpu(self, burn_cpu_threads=None, burn_cpu_seconds=None):
"""Burns vm cpu for some amount of time and dirty cache.
Args:
burn_cpu_threads: Number of threads to burn cpu.
burn_cpu_seconds: Amount of time in seconds to burn cpu.
"""
burn_cpu_threads = burn_cpu_threads or FLAGS.burn_cpu_threads
burn_cpu_seconds = burn_cpu_seconds or FLAGS.burn_cpu_seconds
if burn_cpu_seconds:
self.InstallPackages('sysbench')
end_time = time.time() + burn_cpu_seconds
self.RemoteCommand(
'nohup sysbench --num-threads=%s --test=cpu --cpu-max-prime=10000000 '
'run 1> /dev/null 2> /dev/null &' % burn_cpu_threads
)
if time.time() < end_time:
time.sleep(end_time - time.time())
self.RemoteCommand('pkill -9 sysbench')
def SetSmpAffinity(self):
"""Set SMP IRQ affinity."""
if self._smp_affinity_script:
self.PushDataFile(self._smp_affinity_script)
self.RemoteCommand('sudo bash %s' % self._smp_affinity_script)
else:
raise NotImplementedError()
def SetReadAhead(self, num_sectors, devices):
"""Set read-ahead value for block devices.
Args:
num_sectors: int. Number of sectors of read ahead.
devices: list of strings. A list of block devices.
"""
self.RemoteCommand(
'sudo blockdev --setra {0} {1}; sudo blockdev --setfra {0} {1};'.format(
num_sectors, ' '.join(devices)
)
)
def RecoverChunkedPreprovisionedData(self, path, filename):
"""Recover chunked preprovisioned data."""
self.RemoteCommand(
f'cd {path} && cat {filename}_*.part > {filename} && '
f'rm {filename}_*.part'
)
def GetSha256sum(self, path, filename):
"""Gets the sha256sum hash for a filename in a path on the VM.
Args:
path: string; Path on the VM.
filename: string; Name of the file in the path.
Returns:
string; The sha256sum hash.
"""
stdout, _ = self.RemoteCommand(
'sha256sum %s' % posixpath.join(path, filename)
)
sha256sum, _ = stdout.split()
return sha256sum
def AppendKernelCommandLine(self, command_line, reboot=True):
"""Appends the provided command-line to the VM and reboots by default.
This method should be overwritten by the desired Linux flavor to be useful.
Most (all?) Linux flavors modify the kernel command line by updating the
GRUB configuration files and rebooting.
Args:
command_line: The string to append to the kernel command line.
reboot: Whether or not to reboot to have the change take effect.
"""
raise NotImplementedError(
'Kernel command-line appending for given Linux flavor not implemented.'
)
def _DoAppendKernelCommandLine(self):
"""If the flag is set, attempts to append the provided kernel command line.
In addition, to consolidate reboots during VM prepare, this method sets the
needs reboot bit instead of immediately rebooting.
"""
if FLAGS.disable_smt and self.CheckLsCpu().threads_per_core != 1:
FLAGS.append_kernel_command_line = (
' '.join((FLAGS.append_kernel_command_line, 'nosmt'))
if FLAGS.append_kernel_command_line
else 'nosmt'
)
if FLAGS.append_kernel_command_line:
self.AppendKernelCommandLine(
FLAGS.append_kernel_command_line, reboot=False
)
self._needs_reboot = True
def ModifyKernelModules(self):
"""Add or remove kernel modules based on flags."""
for module in _KERNEL_MODULES_TO_ADD.value:
self.RemoteCommand(f'sudo modprobe {module}')
for module in _KERNEL_MODULES_TO_REMOVE.value:
self.RemoteCommand(f'sudo modprobe -r {module}')
@abc.abstractmethod
def InstallPackages(self, packages: str) -> None:
"""Installs packages using the OS's package manager."""
pass
def _IsSmtEnabled(self):
"""Whether simultaneous multithreading (SMT) is enabled on the vm.
Looks for the "nosmt" attribute in the booted linux kernel command line
parameters.
Returns:
Whether SMT is enabled on the vm.
"""
return not bool(re.search(r'\bnosmt\b', self.kernel_command_line))
@property
def cpu_vulnerabilities(self) -> CpuVulnerabilities:
"""Returns a CpuVulnerabilities of CPU vulnerabilities.
Output of "grep . .../cpu/vulnerabilities/*" looks like this:
/sys/devices/system/cpu/vulnerabilities/itlb_multihit:KVM: Vulnerable
/sys/devices/system/cpu/vulnerabilities/l1tf:Mitigation: PTE Inversion
Which gets turned into
CpuVulnerabilities(vulnerabilities={'itlb_multihit': 'KVM'},
mitigations= {'l1tf': 'PTE Inversion'})
"""
text, _ = self.RemoteCommand(
'sudo grep . /sys/devices/system/cpu/vulnerabilities/*',
ignore_failure=True,
)
vuln = CpuVulnerabilities()
if not text:
logging.warning('No text response when getting CPU vulnerabilities')
return vuln
for line in text.splitlines():
vuln.AddLine(line)
return vuln
def GetNVMEDeviceInfo(self):
"""Get the NVME disk device info, by querying the VM."""
self.InstallPackages('nvme-cli')
version_str, _ = self.RemoteCommand('sudo nvme --version')
version_num = version_str.split()[2]
# TODO(arushigaur): Version check can be removed and we can just parse
# the raw output.
if packaging_version.parse(version_num) >= packaging_version.parse(
'1.5'
) and packaging_version.parse(version_num) < packaging_version.parse(
'2.11'
):
stdout, _ = self.RemoteCommand('sudo nvme list --output-format json')
if not stdout:
return []
response = json.loads(stdout)
return response.get('Devices', [])
else:
# custom parsing for older OSes that do not ship nvme-cli ver 1.5+.
response = []
stdout, _ = self.RemoteCommand('sudo nvme list')
if 'No NVMe devices detected' in stdout:
return []
rows = stdout.splitlines()
delimiter_row = rows[1] # row 0 is the column headers
delimiter_index = [0] + [
i for i in range(len(delimiter_row)) if delimiter_row[i] == ' '
]
for row in rows[2:]:
device = {}
device_info = [
row[i:j]
for i, j in zip(delimiter_index, delimiter_index[1:] + [None])
]
device['DevicePath'] = device_info[0].strip()
device['SerialNumber'] = device_info[1].strip()
device['ModelNumber'] = device_info[2].strip()
response.append(device)
return response
def GenerateAndCaptureLogs(self) -> list[str]:
"""Generates and/or captures logs for this VM and returns the paths.
Currently supports syslog and journalctl, and/or sos reports depending on
what the VM supports.
Returns:
A list of paths where the logs are stored on the caller's machine.
"""
log_files = []
# syslog
try:
syslog_path = vm_util.PrependTempDir('syslog')
self.RemoteCopy(syslog_path, '/var/log/syslog', copy_to=False)
log_files.append(syslog_path)
except errors.VirtualMachine.RemoteCommandError:
logging.warning('Failed to capture VM syslog on %s', self.name)
# journalctl
try:
journalctl_path = vm_util.PrependTempDir('journalctl')
self.RemoteCommand('sudo journalctl --no-pager > /tmp/journalctl.tmp')
self.PullFile(journalctl_path, '/tmp/journalctl.tmp')
log_files.append(journalctl_path)
except errors.VirtualMachine.RemoteCommandError:
logging.warning('Failed to capture VM journalctl on %s', self.name)
# sos report
sosreport_local_path = vm_util.PrependTempDir('sosreport.tar.xz')
if self.GenerateAndCaptureSosReport(sosreport_local_path):
log_files.append(sosreport_local_path)
# Serial port 1 (console)
serial_port_1_path = vm_util.PrependTempDir('serial_port_1')
if self.GenerateAndCaptureSerialPortOutput(serial_port_1_path):
log_files.append(serial_port_1_path)
return log_files
def GenerateAndCaptureSosReport(self, local_path: str) -> bool:
"""Generates an sos report for the remote VM and captures it.
Following the instructions at:
https://cloud.google.com/container-optimized-os/docs/how-to/sosreport
Args:
local_path: The path to store the sos report on the caller's machine.
Returns:
True if the sos report was successfully generated and captured;
False otherwise.
"""
try:
self.RemoteCommandWithReturnCode(
'sudo sos report --all-logs --batch --tmp-dir=/tmp'
)
except errors.VirtualMachine.RemoteCommandError:
logging.warning('Failed to generate sos report on %s', self.name)
return False
sosreport_path = '/tmp/sosreport-*.tar.xz'
# The report is owned by root and is not readable by other users, so we
# need to change the permissions to copy it.
self.RemoteCommand(
f'sudo chmod o+r {sosreport_path}'
)
self.RemoteCopy(local_path, sosreport_path, copy_to=False)
return True
def GenerateAndCaptureSerialPortOutput(self, local_path: str) -> bool:
"""Generates and captures the serial port output for the remote VM.
Implemented per-provider
Args:
local_path: The path to store the serial port output on the caller's
machine.
Returns:
True if the serial port output was successfully generated and captured;
False otherwise.
"""
logging.warning(
'Capturing serial port output is not implemented for this VM.'
)
return False
def _IncrementStackLevel(**kwargs: Any) -> Any:
"""Increments the stack_level variable stored in kwargs."""
if 'stack_level' in kwargs:
kwargs['stack_level'] += 1
else:
# Default to 2 - one for helper function this is called from, & one for
# RemoteHostCommandWithReturnCode.
kwargs['stack_level'] = 2
return kwargs
class ClearMixin(BaseLinuxMixin):
"""Class holding Clear Linux specific VM methods and attributes."""
OS_TYPE = os_types.CLEAR
BASE_OS_TYPE = os_types.CLEAR
def OnStartup(self):
"""Eliminates the need to have a tty to run sudo commands."""
super().OnStartup()
self.RemoteHostCommand('sudo swupd autoupdate --disable')
self.RemoteHostCommand('sudo mkdir -p /etc/sudoers.d')
self.RemoteHostCommand(
"echo 'Defaults:{} !requiretty' | sudo tee /etc/sudoers.d/pkb".format(
self.user_name
),
login_shell=True,
)
def PackageCleanup(self):
"""Cleans up all installed packages.
Performs the normal package cleanup, then deletes the file
added to the /etc/sudoers.d directory during startup.
"""
super().PackageCleanup()
self.RemoteCommand('sudo rm /etc/sudoers.d/pkb')
def SnapshotPackages(self):
"""See base class."""
self.RemoteCommand(
'sudo swupd bundle-list > {}/bundle_list'.format(
linux_packages.INSTALL_DIR
)
)
def RestorePackages(self):
"""See base class."""
self.RemoteCommand(
'sudo swupd bundle-list | grep --fixed-strings --line-regexp'
' --invert-match --file {}/bundle_list | xargs --no-run-if-empty sudo'
' swupd bundle-remove'.format(linux_packages.INSTALL_DIR),
ignore_failure=True,
)
def HasPackage(self, package):
"""Returns True iff the package is available for installation."""
return self.TryRemoteCommand(
'sudo swupd bundle-list --all | grep {}'.format(package)
)
def InstallPackages(self, packages: str) -> None:
"""Installs packages using the swupd bundle manager."""
self.RemoteCommand('sudo swupd bundle-add {}'.format(packages))
def Install(self, package_name):
"""Installs a PerfKit package on the VM."""
if not self.install_packages:
return
if package_name not in self._installed_packages:
package = linux_packages.PACKAGES[package_name]
if hasattr(package, 'SwupdInstall'):
package.SwupdInstall(self)
elif hasattr(package, 'Install'):
package.Install(self)
else:
raise KeyError(
'Package {} has no install method for Clear Linux.'.format(
package_name
)
)
self._installed_packages.add(package_name)
def Uninstall(self, package_name):
"""Uninstalls a PerfKit package on the VM."""
package = linux_packages.PACKAGES[package_name]
if hasattr(package, 'SwupdUninstall'):
package.SwupdUninstall(self)
elif hasattr(package, 'Uninstall'):
package.Uninstall(self)
def GetPathToConfig(self, package_name):
"""See base class."""
package = linux_packages.PACKAGES[package_name]
return package.SwupdGetPathToConfig(self)
def GetServiceName(self, package_name):
"""See base class."""
package = linux_packages.PACKAGES[package_name]
return package.SwupdGetServiceName(self)
def GetOsInfo(self):
"""See base class."""
stdout, _ = self.RemoteCommand('swupd info | grep Installed')
return 'Clear Linux build: {}'.format(
regex_util.ExtractGroup(CLEAR_BUILD_REGEXP, stdout)
)
def SetupProxy(self):
"""Sets up proxy configuration variables for the cloud environment."""
super().SetupProxy()
profile_file = '/etc/profile'
commands = []
if FLAGS.http_proxy:
commands.append(
"echo 'export http_proxy=%s' | sudo tee -a %s"
% (FLAGS.http_proxy, profile_file)
)
if FLAGS.https_proxy:
commands.append(
"echo 'https_proxy=%s' | sudo tee -a %s"
% (FLAGS.https_proxy, profile_file)
)
if FLAGS.ftp_proxy:
commands.append(
"echo 'ftp_proxy=%s' | sudo tee -a %s"
% (FLAGS.ftp_proxy, profile_file)
)
if FLAGS.no_proxy:
commands.append(
"echo 'export no_proxy=%s' | sudo tee -a %s"
% (FLAGS.no_proxy, profile_file)
)
if commands:
self.RemoteCommand(';'.join(commands))
def RemoteCommand(self, command, **kwargs):
"""Runs a command inside the container.
Args:
command: Arguments passed directly to RemoteHostCommandWithReturnCode.
**kwargs: Keyword arguments passed directly to
RemoteHostCommandWithReturnCode.
Returns:
A tuple of stdout and stderr from running the command.
"""
# Escapes bash sequences
command = '. /etc/profile; %s' % (command)
return self.RemoteHostCommand(command, **kwargs)[:2]
class BaseContainerLinuxMixin(BaseLinuxMixin):
"""Class holding VM methods for minimal container-based OSes like Core OS.
These operating systems have SSH like other Linux OSes, but no package manager
to run Linux benchmarks without Docker.
Because they cannot install packages, they only support VM life cycle
benchmarks like cluster_boot.
"""
def InstallPackages(self, package_name):
raise NotImplementedError('Container OSes have no package managers.')
def HasPackage(self, package: str) -> bool:
return False
# Install could theoretically be supported. A hermetic architecture
# appropriate binary could be copied into the VM and run.
# However because curl, wget, and object store clients cannot be installed and
# may or may not be present, copying the binary is non-trivial so simply
# block trying.
def Install(self, package_name):
raise NotImplementedError('Container OSes have no package managers.')
def Uninstall(self, package_name):
raise NotImplementedError('Container OSes have no package managers.')
def PrepareVMEnvironment(self):
# Don't try to install packages as normal, because it will fail.
pass
class BaseRhelMixin(BaseLinuxMixin):
"""Class holding RHEL/CentOS specific VM methods and attributes."""
# In all RHEL 8+ based distros yum is an alias to dnf.
# dnf is backwards compatible with yum, but has some additional capabilities
# For CentOS and RHEL 7 we override this to yum and do not pass dnf-only flags
# The commands are similar enough that forking whole methods seemed necessary.
# This can be removed when Amazon Linux 2 is no longer supported by PKB.
PACKAGE_MANAGER = DNF
# OS_TYPE = os_types.RHEL
BASE_OS_TYPE = os_types.RHEL
# RHEL's command to create a initramfs image.
INIT_RAM_FS_CMD = 'sudo dracut --regenerate-all -f'
def OnStartup(self):
"""Eliminates the need to have a tty to run sudo commands."""
super().OnStartup()
self.RemoteHostCommand(
"echo 'Defaults:%s !requiretty' | sudo tee /etc/sudoers.d/pkb"
% self.user_name,
login_shell=True,
)
if FLAGS.gce_hpc_tools:
self.InstallGcpHpcTools()
# yum cron can stall causing yum commands to hang
if _DISABLE_YUM_CRON.value:
if self.PACKAGE_MANAGER == YUM:
self.RemoteHostCommand(
'sudo systemctl disable yum-cron.service', ignore_failure=True
)
elif self.PACKAGE_MANAGER == DNF:
self.RemoteHostCommand(
'sudo systemctl disable dnf-automatic.timer', ignore_failure=True
)
def InstallGcpHpcTools(self):
"""Installs the GCP HPC tools."""
self.Install('gce_hpc_tools')
def PackageCleanup(self):
"""Cleans up all installed packages.
Performs the normal package cleanup, then deletes the file
added to the /etc/sudoers.d directory during startup.
"""
super().PackageCleanup()
self.RemoteCommand('sudo rm /etc/sudoers.d/pkb')
def SnapshotPackages(self):
"""Grabs a snapshot of the currently installed packages."""
self.RemoteCommand(
'rpm -qa > %s/rpm_package_list' % linux_packages.INSTALL_DIR
)
def RestorePackages(self):
"""Restores the currently installed packages to those snapshotted."""
self.RemoteCommand(
'rpm -qa | grep --fixed-strings --line-regexp --invert-match --file '
'%s/rpm_package_list | xargs --no-run-if-empty sudo rpm -e'
% linux_packages.INSTALL_DIR,
ignore_failure=True,
)
def HasPackage(self, package):
"""Returns True iff the package is available for installation."""
return self.TryRemoteCommand(f'sudo {self.PACKAGE_MANAGER} info {package}')
# yum talks to the network on each request so transient issues may fix
# themselves on retry
@vm_util.Retry(max_retries=UPDATE_RETRIES)
def InstallPackages(self, packages):
"""Installs packages using the yum or dnf package managers."""
cmd = f'sudo {self.PACKAGE_MANAGER} install -y {packages}'
if self.PACKAGE_MANAGER == DNF:
cmd += ' --allowerasing'
self.RemoteCommand(cmd)
@vm_util.Retry(max_retries=UPDATE_RETRIES)
def InstallPackageGroup(self, package_group):
"""Installs a 'package group' using the yum package manager."""
cmd = f'sudo {self.PACKAGE_MANAGER} groupinstall -y "{package_group}"'
if self.PACKAGE_MANAGER == DNF:
cmd += ' --allowerasing'
self.RemoteCommand(cmd)
def Install(self, package_name):
"""Installs a PerfKit package on the VM."""
if not self.install_packages:
return
if package_name not in self._installed_packages:
package = linux_packages.PACKAGES[package_name]
if hasattr(package, 'YumInstall'):
package.YumInstall(self)
elif hasattr(package, 'Install'):
package.Install(self)
else:
raise KeyError(
'Package %s has no install method for RHEL.' % package_name
)
self._installed_packages.add(package_name)
def Uninstall(self, package_name):
"""Uninstalls a PerfKit package on the VM."""
package = linux_packages.PACKAGES[package_name]
if hasattr(package, 'YumUninstall'):
package.YumUninstall(self)
elif hasattr(package, 'Uninstall'):
package.Uninstall(self)
def GetPathToConfig(self, package_name):
"""Returns the path to the config file for PerfKit packages.
This function is mostly useful when config files locations
don't match across distributions (such as mysql). Packages don't
need to implement it if this is not the case.
"""
package = linux_packages.PACKAGES[package_name]
return package.YumGetPathToConfig(self)
def GetServiceName(self, package_name):
"""Returns the service name of a PerfKit package.
This function is mostly useful when service names don't
match across distributions (such as mongodb). Packages don't
need to implement it if this is not the case.
"""
package = linux_packages.PACKAGES[package_name]
return package.YumGetServiceName(self)
def SetupProxy(self):
"""Sets up proxy configuration variables for the cloud environment."""
super().SetupProxy()
if self.PACKAGE_MANAGER == YUM:
yum_proxy_file = '/etc/yum.conf'
elif self.PACKAGE_MANAGER == DNF:
yum_proxy_file = '/etc/dnf/dnf.conf'
if FLAGS.http_proxy:
self.RemoteCommand(
"echo -e 'proxy= %s' | sudo tee -a %s"
% (FLAGS.http_proxy, yum_proxy_file)
)
def AppendKernelCommandLine(self, command_line, reboot=True):
"""Appends the provided command-line to the VM and reboots by default."""
self.RemoteCommand(
r'echo GRUB_CMDLINE_LINUX_DEFAULT=\"\${GRUB_CMDLINE_LINUX_DEFAULT} %s\"'
' | sudo tee -a /etc/default/grub' % command_line
)
self.RemoteCommand('sudo grub2-mkconfig -o /boot/grub2/grub.cfg')
self.RemoteCommand('sudo grub2-mkconfig -o /etc/grub2.cfg')
if reboot:
self.Reboot()
class AmazonLinux2Mixin(BaseRhelMixin):
"""Class holding Amazon Linux 2 VM methods and attributes."""
OS_TYPE = os_types.AMAZONLINUX2
PACKAGE_MANAGER = YUM
def SetupPackageManager(self):
"""Install EPEL."""
# https://aws.amazon.com/premiumsupport/knowledge-center/ec2-enable-epel/
self.RemoteCommand('sudo amazon-linux-extras install epel -y')
class AmazonNeuronMixin(AmazonLinux2Mixin):
"""Class holding Neuron VM methods and attributes."""
OS_TYPE = os_types.AMAZON_NEURON
class AmazonLinux2023Mixin(BaseRhelMixin):
"""Class holding Amazon Linux 2023 VM methods and attributes."""
OS_TYPE = os_types.AMAZONLINUX2023
# Note no EPEL support
# https://docs.aws.amazon.com/linux/al2023/ug/compare-with-al2.html#epel
class Rhel8Mixin(BaseRhelMixin):
"""Class holding RHEL 8 specific VM methods and attributes."""
OS_TYPE = os_types.RHEL8
def SetupPackageManager(self):
"""Install EPEL."""
# https://docs.fedoraproject.org/en-US/epel/#_rhel_8
self.RemoteCommand(f'sudo dnf install -y {_EPEL_URL.format(8)}')
class Rhel9Mixin(BaseRhelMixin):
"""Class holding RHEL 9 specific VM methods and attributes."""
OS_TYPE = os_types.RHEL9
def SetupPackageManager(self):
"""Install EPEL."""
# https://docs.fedoraproject.org/en-US/epel/#_rhel_9
self.RemoteCommand(f'sudo dnf install -y {_EPEL_URL.format(9)}')
class Fedora36Mixin(BaseRhelMixin):
"""Class holding Fedora36 specific methods and attributes."""
OS_TYPE = os_types.FEDORA36
def SetupPackageManager(self):
"""Fedora does not need epel."""
class Fedora37Mixin(BaseRhelMixin):
"""Class holding Fedora37 specific methods and attributes."""
OS_TYPE = os_types.FEDORA37
def SetupPackageManager(self):
"""Fedora does not need epel."""
class CentOsStream9Mixin(BaseRhelMixin):
"""Class holding CentOS Stream 9 specific VM methods and attributes."""
OS_TYPE = os_types.CENTOS_STREAM9
def SetupPackageManager(self):
"""Install EPEL."""
# https://docs.fedoraproject.org/en-US/epel/#_centos_stream_9
self.RemoteCommand(
'sudo dnf config-manager --set-enabled crb && '
'sudo dnf install -y epel-release epel-next-release'
)
class RockyLinux8Mixin(BaseRhelMixin):
"""Class holding Rocky Linux 8 specific VM methods and attributes."""
OS_TYPE = os_types.ROCKY_LINUX8
def SetupPackageManager(self):
"""Install EPEL."""
# https://docs.fedoraproject.org/en-US/epel/#_almalinux_8_rocky_linux_8
self.RemoteCommand(
'sudo dnf config-manager --set-enabled powertools && '
'sudo dnf install -y epel-release'
)
class RockyLinux9Mixin(BaseRhelMixin):
"""Class holding Rocky Linux 8 specific VM methods and attributes."""
OS_TYPE = os_types.ROCKY_LINUX9
def SetupPackageManager(self):
"""Install EPEL."""
# https://docs.fedoraproject.org/en-US/epel/#_almalinux_9_rocky_linux_98
self.RemoteCommand(
'sudo dnf config-manager --set-enabled crb &&'
'sudo dnf install -y epel-release'
)
class CoreOsMixin(BaseContainerLinuxMixin):
"""Class holding CoreOS Container Linux specific VM methods and attributes."""
OS_TYPE = os_types.CORE_OS
BASE_OS_TYPE = os_types.CORE_OS
class BaseDebianMixin(BaseLinuxMixin):
"""Class holding Debian specific VM methods and attributes."""
OS_TYPE = 'base-only'
BASE_OS_TYPE = os_types.DEBIAN
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Whether or not apt-get update has been called.
# We defer running apt-get update until the first request to install a
# package.
self._apt_updated = False
@vm_util.Retry(max_retries=UPDATE_RETRIES)
def AptUpdate(self):
"""Updates the package lists on VMs using apt."""
try:
# setting the timeout on the apt-get to 10 minutes because
# it is known to get stuck. In a normal update this
# takes less than 30 seconds, but far flung regions can be slower.
self.RemoteCommand('sudo apt-get update', timeout=600)
except errors.VirtualMachine.RemoteCommandError as e:
# If there is a problem, remove the lists in order to get rid of
# "Hash Sum mismatch" errors (the files will be restored when
# apt-get update is run again).
self.RemoteCommand('sudo rm -r /var/lib/apt/lists/*')
raise e
def SnapshotPackages(self):
"""Grabs a snapshot of the currently installed packages."""
self.RemoteCommand(
'dpkg --get-selections > %s/dpkg_selections'
% linux_packages.INSTALL_DIR
)
def RestorePackages(self):
"""Restores the currently installed packages to those snapshotted."""
self.RemoteCommand('sudo dpkg --clear-selections')
self.RemoteCommand(
'sudo dpkg --set-selections < %s/dpkg_selections'
% linux_packages.INSTALL_DIR
)
self.RemoteCommand(
"sudo DEBIAN_FRONTEND='noninteractive' "
'apt-get --purge -y dselect-upgrade'
)
def HasPackage(self, package):
"""Returns True iff the package is available for installation."""
if not self._apt_updated:
self.AptUpdate()
self._apt_updated = True
# apt-cache show will exit 0 for purely virtual packages.
# It does always log `N: No packages found` to STDOUT in that case though
stdout, stderr, retcode = self.RemoteCommandWithReturnCode(
'apt-cache --quiet=0 show ' + package, ignore_failure=True
)
return not retcode and 'No packages found' not in (stdout + stderr)
@vm_util.Retry()
def InstallPackages(self, packages):
"""Installs packages using the apt package manager."""
if not self.install_packages:
return
if not self._apt_updated:
self.AptUpdate()
self._apt_updated = True
try:
install_command = (
"sudo DEBIAN_FRONTEND='noninteractive' /usr/bin/apt-get -y install %s"
% (packages)
)
self.RemoteCommand(install_command)
except errors.VirtualMachine.RemoteCommandError as e:
# TODO(user): Remove code below after Azure fix their package repository,
# or add code to recover the sources.list
self.RemoteCommand(
'sudo sed -i.bk "s/azure.archive.ubuntu.com/archive.ubuntu.com/g" '
'/etc/apt/sources.list'
)
logging.info(
'Installing "%s" failed on %s. This may be transient. '
'Updating package list.',
packages,
self,
)
self.AptUpdate()
raise e
def Install(self, package_name):
"""Installs a PerfKit package on the VM."""
if not self.install_packages:
return
if not self._apt_updated:
self.AptUpdate()
self._apt_updated = True
if package_name not in self._installed_packages:
package = linux_packages.PACKAGES[package_name]
if hasattr(package, 'AptInstall'):
package.AptInstall(self)
elif hasattr(package, 'Install'):
package.Install(self)
else:
raise KeyError(
'Package %s has no install method for Debian.' % package_name
)
self._installed_packages.add(package_name)
def Uninstall(self, package_name):
"""Uninstalls a PerfKit package on the VM."""
package = linux_packages.PACKAGES[package_name]
if hasattr(package, 'AptUninstall'):
package.AptUninstall(self)
elif hasattr(package, 'Uninstall'):
package.Uninstall(self)
self._installed_packages.discard(package_name)
def GetPathToConfig(self, package_name):
"""Returns the path to the config file for PerfKit packages.
This function is mostly useful when config files locations
don't match across distributions (such as mysql). Packages don't
need to implement it if this is not the case.
Args:
package_name: the name of the package.
"""
package = linux_packages.PACKAGES[package_name]
return package.AptGetPathToConfig(self)
def GetServiceName(self, package_name):
"""Returns the service name of a PerfKit package.
This function is mostly useful when service names don't
match across distributions (such as mongodb). Packages don't
need to implement it if this is not the case.
Args:
package_name: the name of the package.
"""
package = linux_packages.PACKAGES[package_name]
return package.AptGetServiceName(self)
def SetupProxy(self):
"""Sets up proxy configuration variables for the cloud environment."""
super().SetupProxy()
apt_proxy_file = '/etc/apt/apt.conf'
commands = []
if FLAGS.http_proxy:
commands.append(
'echo -e \'Acquire::http::proxy "%s";\' |sudo tee -a %s'
% (FLAGS.http_proxy, apt_proxy_file)
)
if FLAGS.https_proxy:
commands.append(
'echo -e \'Acquire::https::proxy "%s";\' |sudo tee -a %s'
% (FLAGS.https_proxy, apt_proxy_file)
)
if commands:
self.RemoteCommand(';'.join(commands))
def IncreaseSSHConnection(self, target):
"""Increase maximum number of ssh connections on vm.
Args:
target: int. The max number of ssh connection.
"""
self.RemoteCommand(
r'sudo sed -i -e "s/.*MaxStartups.*/MaxStartups {}/" '
'/etc/ssh/sshd_config'.format(target)
)
self.RemoteCommand('sudo service ssh restart')
def AppendKernelCommandLine(self, command_line, reboot=True):
"""Appends the provided command-line to the VM and reboots by default."""
self.RemoteCommand(
r'echo GRUB_CMDLINE_LINUX_DEFAULT=\"\${GRUB_CMDLINE_LINUX_DEFAULT} %s\"'
r' | sudo tee -a /etc/default/grub' % command_line
)
self.RemoteCommand('sudo update-grub')
if reboot:
self.Reboot()
class Debian11Mixin(BaseDebianMixin, os_mixin.DeprecatedOsMixin):
"""Class holding Debian 11 specific VM methods and attributes."""
OS_TYPE = os_types.DEBIAN11
ALTERNATIVE_OS = os_types.DEBIAN12
END_OF_LIFE = '2026-08-31'
def PrepareVMEnvironment(self):
# Missing in some images. Required by PrepareVMEnvironment to determine
# partitioning.
self.InstallPackages('fdisk')
super().PrepareVMEnvironment()
class Debian12Mixin(BaseDebianMixin):
"""Class holding Debian 12 specific VM methods and attributes."""
OS_TYPE = os_types.DEBIAN12
def PrepareVMEnvironment(self):
# Missing in some images. Required by PrepareVMEnvironment to determine
# partitioning.
self.InstallPackages('fdisk')
super().PrepareVMEnvironment()
class Debian11BackportsMixin(Debian11Mixin):
"""Debian 11 with backported kernel."""
OS_TYPE = os_types.DEBIAN11_BACKPORTS
class BaseUbuntuMixin(BaseDebianMixin):
"""Class holding Ubuntu specific VM methods and attributes."""
def AppendKernelCommandLine(self, command_line, reboot=True):
"""Appends the provided command-line to the VM and reboots by default."""
self.RemoteCommand(
r'echo GRUB_CMDLINE_LINUX_DEFAULT=\"\${GRUB_CMDLINE_LINUX_DEFAULT} %s\"'
r' | sudo tee -a /etc/default/grub.d/50-cloudimg-settings.cfg'
% command_line
)
self.RemoteCommand('sudo update-grub')
if reboot:
self.Reboot()
class Ubuntu2004Mixin(BaseUbuntuMixin):
"""Class holding Ubuntu2004 specific VM methods and attributes."""
OS_TYPE = os_types.UBUNTU2004
def UpdateEnvironmentPath(self):
"""Add /snap/bin to default search path for Ubuntu2004.
See https://bugs.launchpad.net/snappy/+bug/1659719.
"""
# Ensure ~/.bashrc exists.
self.RemoteCommand(
r'touch ~/.bashrc && sed -i "1 i\export PATH=$PATH:/snap/bin" ~/.bashrc'
)
self.RemoteCommand(
r'sudo sed -i "1 i\export PATH=$PATH:/snap/bin" /etc/bash.bashrc'
)
class Ubuntu2004EfaMixin(Ubuntu2004Mixin):
"""Class holding EFA specific VM methods and attributes."""
OS_TYPE = os_types.UBUNTU2004_EFA
class Ubuntu2004DLMixin(Ubuntu2004Mixin):
"""Class holding DeepLearning specific VM methods and attributes."""
OS_TYPE = os_types.UBUNTU2004_DL
def OnStartup(self):
super().OnStartup()
self.RemoteCommand('sudo chmod -R 755 /var/lib/nvidia')
self.RemoteCommand('sudo chown $USER:$USER /var/lib/nvidia')
self.RemoteCommand('mkdir -p /var/lib/nvidia/lib64')
def UpdateDockerfile(self, unused_dockerfile):
"""Add provider specific instructions to a docker file.
Args:
unused_dockerfile: Path to dockerfile on remote VMs.
"""
pass
class Debian12DLMixin(Debian12Mixin):
"""Class holding DeepLearning specific VM methods and attributes."""
OS_TYPE = os_types.DEBIAN12_DL
def OnStartup(self):
super().OnStartup()
self.RemoteCommand('sudo chmod -R 755 /var/lib/nvidia')
self.RemoteCommand('sudo chown $USER:$USER /var/lib/nvidia')
self.RemoteCommand('mkdir -p /var/lib/nvidia/lib64')
def UpdateDockerfile(self, unused_dockerfile):
"""Add provider specific instructions to a docker file.
Args:
unused_dockerfile: Path to dockerfile on remote VMs.
"""
pass
class AmazonLinux2DLMixin(AmazonLinux2Mixin):
"""Class holding DLAMI specific VM methods and attributes."""
OS_TYPE = os_types.AMAZONLINUX2_DL
class Ubuntu2204Mixin(BaseUbuntuMixin):
"""Class holding Ubuntu 22.04 specific VM methods and attributes."""
OS_TYPE = os_types.UBUNTU2204
class Ubuntu2404Mixin(BaseUbuntuMixin):
"""Class holding Ubuntu 24.04 specific VM methods and attributes."""
OS_TYPE = os_types.UBUNTU2404
class ContainerizedDebianMixin(BaseDebianMixin):
"""DEPRECATED mixin with no current implementations.
Class representing a Containerized Virtual Machine.
A Containerized Virtual Machine is a VM that runs remote commands
within a Docker Container.
Any call to RemoteCommand() will be run within the container
whereas any call to RemoteHostCommand() will be run in the VM itself.
"""
BASE_DOCKER_IMAGE = 'ubuntu:xenial'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.docker_id = None
def _CheckDockerExists(self):
"""Returns whether docker is installed or not."""
resp, _ = self.RemoteHostCommand('command -v docker', ignore_failure=True)
if resp.rstrip() == '':
return False
return True
def PrepareVMEnvironment(self):
"""Initializes docker before proceeding with preparation."""
if not self._CheckDockerExists():
self.Install('docker')
# We need to explicitly create VM_TMP_DIR in the host because
# otherwise it will be implicitly created by Docker in InitDocker()
# (because of the -v option) and owned by root instead of perfkit,
# causing permission problems.
self.RemoteHostCommand('mkdir -p %s' % vm_util.VM_TMP_DIR)
self.InitDocker()
# This will create the VM_TMP_DIR in the container.
# Has to be done after InitDocker() because it needs docker_id.
self._CreateVmTmpDir()
super().PrepareVMEnvironment()
def InitDocker(self):
"""Initializes the docker container daemon."""
init_docker_cmd = [
'sudo docker run -d --rm --net=host --workdir=%s -v %s:%s '
% (CONTAINER_WORK_DIR, vm_util.VM_TMP_DIR, CONTAINER_MOUNT_DIR)
]
for sd in self.scratch_disks:
init_docker_cmd.append('-v %s:%s ' % (sd.mount_point, sd.mount_point))
init_docker_cmd.append('%s sleep infinity ' % self.BASE_DOCKER_IMAGE)
init_docker_cmd = ''.join(init_docker_cmd)
resp, _ = self.RemoteHostCommand(init_docker_cmd)
self.docker_id = resp.rstrip()
return self.docker_id
def RemoteCommand(self, command, **kwargs):
"""Runs a command inside the container.
Args:
command: A valid bash command.
**kwargs: Keyword arguments passed directly to RemoteHostCommand.
Returns:
A tuple of stdout and stderr from running the command.
"""
# Escapes bash sequences
command = command.replace("'", r"'\''")
logging.info('Docker running: %s', command, stacklevel=2)
command = "sudo docker exec %s bash -c '%s'" % (self.docker_id, command)
return self.RemoteHostCommand(command, **kwargs)
def ContainerCopy(self, file_name, container_path='', copy_to=True):
"""Copies a file to or from container_path to the host's vm_util.VM_TMP_DIR.
Args:
file_name: Name of the file in the host's vm_util.VM_TMP_DIR.
container_path: Optional path of where to copy file on container.
copy_to: True to copy to container, False to copy from container.
Raises:
RemoteExceptionError: If the source container_path is blank.
"""
if copy_to:
if container_path == '':
container_path = CONTAINER_WORK_DIR
# Everything in vm_util.VM_TMP_DIR is directly accessible
# both in the host and in the container
source_path = posixpath.join(CONTAINER_MOUNT_DIR, file_name)
command = 'cp %s %s' % (source_path, container_path)
self.RemoteCommand(command)
else:
if container_path == '':
raise errors.VirtualMachine.RemoteExceptionError(
'Cannot copy from blank target'
)
destination_path = posixpath.join(CONTAINER_MOUNT_DIR, file_name)
command = 'cp %s %s' % (container_path, destination_path)
self.RemoteCommand(command)
@vm_util.Retry(
poll_interval=1,
max_retries=3,
retryable_exceptions=(errors.VirtualMachine.RemoteCommandError,),
)
def RemoteCopy(self, file_path, remote_path='', copy_to=True):
"""Copies a file to or from the container in the remote VM.
Args:
file_path: Local path to file.
remote_path: Optional path of where to copy file inside the container.
copy_to: True to copy to VM, False to copy from VM.
"""
if copy_to:
file_name = os.path.basename(file_path)
tmp_path = posixpath.join(vm_util.VM_TMP_DIR, file_name)
self.RemoteHostCopy(file_path, tmp_path, copy_to)
self.ContainerCopy(file_name, remote_path, copy_to)
else:
file_name = posixpath.basename(remote_path)
tmp_path = posixpath.join(vm_util.VM_TMP_DIR, file_name)
self.ContainerCopy(file_name, remote_path, copy_to)
self.RemoteHostCopy(file_path, tmp_path, copy_to)
def MoveFile(self, target, source_path, remote_path=''):
"""Copies a file from one VM to a target VM.
Copies a file from a container in the source VM to a container
in the target VM.
Args:
target: The target ContainerizedVirtualMachine object.
source_path: The location of the file on the REMOTE machine.
remote_path: The destination of the file on the TARGET machine, default is
the root directory.
"""
file_name = posixpath.basename(source_path)
# Copies the file to vm_util.VM_TMP_DIR in source
self.ContainerCopy(file_name, source_path, copy_to=False)
# Moves the file to vm_util.VM_TMP_DIR in target
source_host_path = posixpath.join(vm_util.VM_TMP_DIR, file_name)
target_host_dir = vm_util.VM_TMP_DIR
self.MoveHostFile(target, source_host_path, target_host_dir)
# Copies the file to its final destination in the container
target.ContainerCopy(file_name, remote_path)
def SnapshotPackages(self):
"""Grabs a snapshot of the currently installed packages."""
pass
def PackageCleanup(self):
"""Cleans up all installed packages.
Stop the docker container launched with --rm.
"""
if self.docker_id:
self.RemoteHostCommand('docker stop %s' % self.docker_id)
def _ParseTextProperties(text, key_value_regex=_COLON_SEPARATED_RE):
"""Parses raw text that has lines in "key:value" form.
When comes across an empty line will return a dict of the current values.
Args:
text: Text of lines in "key:value" form.
key_value_regex: Regex to use to parse key and value from each line of text.
Yields:
Dict of [key,value] values for a section.
"""
current_data = {}
for line in (line.strip() for line in text.splitlines()):
if line:
m = key_value_regex.match(line)
if m:
current_data[m.group('key')] = m.group('value')
else:
logging.debug('Ignoring bad line "%s"', line)
else:
# Hit a section break
if current_data:
yield current_data
current_data = {}
if current_data:
yield current_data
def CreateUlimitSamples(
vms: list['BaseLinuxVirtualMachine'],
) -> list[sample.Sample]:
"""Creates samples from linux VMs of ulimit output."""
samples = []
for vm in vms:
metadata = {'node_name': vm.name}
metadata.update(vm.CheckUlimit().data)
samples.append(sample.Sample('ulimit', 0, '', metadata))
return samples
class UlimitResults():
"""Holds the contents of the command ulimit."""
def __init__(self, ulimit: str):
"""UlimitResults Constructor.
The ulimit command does *not* have any option for
json output, so keep on using the text format.
Args:
ulimit: A string in the format of "ulimit -a" command
Raises:
ValueError: if the format of ulimit isn't what was expected for parsing
Example value of ulimit is:
real-time non-blocking time (microseconds, -R) unlimited
core file size (blocks, -c) 0
data seg size (kbytes, -d) unlimited
scheduling priority (-e) 0
file size (blocks, -f) unlimited
pending signals (-i) 772515
max locked memory (kbytes, -l) unlimited
max memory size (kbytes, -m) unlimited
open files (-n) 131072
pipe size (512 bytes, -p) 8
POSIX message queues (bytes, -q) 819200
real-time priority (-r) 0
stack size (kbytes, -s) 8192
cpu time (seconds, -t) unlimited
max user processes (-u) 131072
virtual memory (kbytes, -v) unlimited
file locks (-x) unlimited
"""
self.data = {}
for stanza in _ParseTextProperties(ulimit, _BRACKET_SEPARATED_RE):
self.data.update(stanza)
def CreateLscpuSamples(vms):
"""Creates samples from linux VMs of lscpu output."""
samples = []
for vm in vms:
if vm.OS_TYPE in os_types.LINUX_OS_TYPES:
metadata = {'node_name': vm.name}
metadata.update(vm.CheckLsCpu().data)
samples.append(sample.Sample('lscpu', 0, '', metadata))
return samples
class LsCpuResults:
"""Holds the contents of the command lscpu."""
def __init__(self, lscpu):
"""LsCpuResults Constructor.
The lscpu command on Ubuntu 16.04 does *not* have the "--json" option for
json output, so keep on using the text format.
Args:
lscpu: A string in the format of "lscpu" command
Raises:
ValueError: if the format of lscpu isn't what was expected for parsing
Example value of lscpu is:
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 12
On-line CPU(s) list: 0-11
Thread(s) per core: 2
Core(s) per socket: 6
Socket(s): 1
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 79
Stepping: 1
CPU MHz: 1202.484
BogoMIPS: 7184.10
Virtualization: VT-x
L1d cache: 32K
L1i cache: 32K
L2 cache: 256K
L3 cache: 15360K
NUMA node0 CPU(s): 0-11
"""
self.data = {}
for stanza in _ParseTextProperties(lscpu):
self.data.update(stanza)
def GetInt(key):
if key in self.data and self.data[key].isdigit():
return int(self.data[key])
raise ValueError(
'Could not find integer "{}" in {}'.format(key, sorted(self.data))
)
self.numa_node_count = GetInt('NUMA node(s)')
self.cores_per_socket = GetInt('Core(s) per socket')
self.socket_count = GetInt('Socket(s)')
self.threads_per_core = GetInt('Thread(s) per core')
def CreateProcCpuSamples(vms):
"""Creates samples from linux VMs of lscpu output."""
samples = []
for vm in vms:
if vm.OS_TYPE not in os_types.LINUX_OS_TYPES:
continue
data = vm.CheckProcCpu()
metadata = {'node_name': vm.name}
metadata.update(data.GetValues())
samples.append(sample.Sample('proccpu', 0, '', metadata))
metadata = {'node_name': vm.name}
for processor_id, raw_values in data.mappings.items():
values = ['%s=%s' % item for item in raw_values.items()]
metadata['proc_{}'.format(processor_id)] = ';'.join(sorted(values))
samples.append(sample.Sample('proccpu_mapping', 0, '', metadata))
return samples
class ProcCpuResults:
"""Parses /proc/cpuinfo text into grouped values.
Most of the cpuinfo is repeated per processor. Known ones that change per
processor are listed in _PER_CPU_KEYS and are processed separately to make
reporting easier.
Example metadata for metric='proccpu':
|bugs:spec_store_bypass spectre_v1 spectre_v2 swapgs|,
|cache size:25344 KB|
Example metadata for metric='proccpu_mapping':
|proc_0:apicid=0;core id=0;initial apicid=0;physical id=0|,
|proc_1:apicid=2;core id=1;initial apicid=2;physical id=0|
Attributes:
text: The /proc/cpuinfo text.
mappings: Dict of [processor id: dict of values that change with cpu]
attributes: Dict of /proc/cpuinfo entries that are not in mappings.
"""
# known attributes that vary with the processor id
_PER_CPU_KEYS = ['core id', 'initial apicid', 'apicid', 'physical id']
# attributes that should be sorted, for example turning the 'flags' value
# of "popcnt avx512bw" to "avx512bw popcnt"
_SORT_VALUES = ['flags', 'bugs']
def __init__(self, text):
self.mappings = {}
self.attributes = collections.defaultdict(set)
for stanza in _ParseTextProperties(text):
processor_id, single_values, multiple_values = self._ParseStanza(stanza)
if processor_id is None: # can be 0
continue
if processor_id in self.mappings:
logging.warning('Processor id %s seen twice in %s', processor_id, text)
continue
self.mappings[processor_id] = single_values
for key, value in multiple_values.items():
self.attributes[key].add(value)
def GetValues(self):
"""Dict of cpuinfo keys to its values.
Multiple values are joined by semicolons.
Returns:
Dict of [cpuinfo key:value string]
"""
cpuinfo = {
key: ';'.join(sorted(values)) for key, values in self.attributes.items()
}
cpuinfo['proccpu'] = ','.join(sorted(self.attributes.keys()))
return cpuinfo
def _ParseStanza(self, stanza):
"""Parses the cpuinfo section for an individual CPU.
Args:
stanza: Dict of the /proc/cpuinfo results for an individual CPU.
Returns:
Tuple of (processor_id, dict of values that are known to change with
each CPU, dict of other cpuinfo results).
"""
singles = {}
if 'processor' not in stanza:
return None, None, None
processor_id = int(stanza.pop('processor'))
for key in self._PER_CPU_KEYS:
if key in stanza:
singles[key] = stanza.pop(key)
for key in self._SORT_VALUES:
if key in stanza:
stanza[key] = ' '.join(sorted(stanza[key].split()))
return processor_id, singles, stanza
class JujuMixin(BaseDebianMixin):
"""DEPRECATED mixin with no current implementations.
Class to allow running Juju-deployed workloads.
Bootstraps a Juju environment using the manual provider:
https://jujucharms.com/docs/stable/config-manual
"""
# TODO: Add functionality to tear down and uninstall Juju
# (for pre-provisioned) machines + JujuUninstall for packages using charms.
is_controller = False
# A reference to the juju controller, useful when operations occur against
# a unit's VM but need to be performed from the controller.
controller = None
vm_group = None
machines = {}
units = []
installation_lock = threading.Lock()
environments_yaml = """
default: perfkit
environments:
perfkit:
type: manual
bootstrap-host: {0}
"""
def _Bootstrap(self):
"""Bootstrap a Juju environment."""
resp, _ = self.RemoteHostCommand('juju bootstrap')
def JujuAddMachine(self, unit):
"""Adds a manually-created virtual machine to Juju.
Args:
unit: An object representing the unit's BaseVirtualMachine.
"""
resp, _ = self.RemoteHostCommand(
'juju add-machine ssh:%s' % unit.internal_ip
)
# We don't know what the machine's going to be used for yet,
# but track it's placement for easier access later.
# We're looking for the output: created machine %d
machine_id = _[_.rindex(' ') :].strip()
self.machines[machine_id] = unit
def JujuConfigureEnvironment(self):
"""Configure a bootstrapped Juju environment."""
if self.is_controller:
resp, _ = self.RemoteHostCommand('mkdir -p ~/.juju')
with vm_util.NamedTemporaryFile() as tf:
tf.write(self.environments_yaml.format(self.internal_ip))
tf.close()
self.PushFile(tf.name, '~/.juju/environments.yaml')
def JujuEnvironment(self):
"""Get the name of the current environment."""
output, _ = self.RemoteHostCommand('juju switch')
return output.strip()
def JujuRun(self, cmd):
"""Run a command on the virtual machine.
Args:
cmd: The command to run.
"""
output, _ = self.RemoteHostCommand(cmd)
return output.strip()
def JujuStatus(self, pattern=''):
"""Return the status of the Juju environment.
Args:
pattern: Optionally match machines/services with a pattern.
"""
output, _ = self.RemoteHostCommand('juju status %s --format=json' % pattern)
return output.strip()
def JujuVersion(self):
"""Return the Juju version."""
output, _ = self.RemoteHostCommand('juju version')
return output.strip()
def JujuSet(self, service, params=[]):
"""Set the configuration options on a deployed service.
Args:
service: The name of the service.
params: A list of key=values pairs.
"""
output, _ = self.RemoteHostCommand(
'juju set %s %s' % (service, ' '.join(params))
)
return output.strip()
@vm_util.Retry(poll_interval=30, timeout=3600)
def JujuWait(self):
"""Wait for all deployed services to be installed, configured, and idle."""
status = yaml.safe_load(self.JujuStatus())
for service in status['services']:
ss = status['services'][service]['service-status']['current']
# Accept blocked because the service may be waiting on relation
if ss not in ['active', 'unknown']:
raise errors.Juju.TimeoutException(
'Service %s is not ready; status is %s' % (service, ss)
)
if ss in ['error']:
# The service has failed to deploy.
debuglog = self.JujuRun('juju debug-log --limit 200')
logging.warning(debuglog)
raise errors.Juju.UnitErrorException(
'Service %s is in an error state' % service
)
for unit in status['services'][service]['units']:
unit_data = status['services'][service]['units'][unit]
ag = unit_data['agent-state']
if ag != 'started':
raise errors.Juju.TimeoutException(
'Service %s is not ready; agent-state is %s' % (service, ag)
)
ws = unit_data['workload-status']['current']
if ws not in ['active', 'unknown']:
raise errors.Juju.TimeoutException(
'Service %s is not ready; workload-state is %s' % (service, ws)
)
def JujuDeploy(self, charm, vm_group):
"""Deploy (and scale) this service to the machines in its vm group.
Args:
charm: The charm to deploy, i.e., cs:trusty/ubuntu.
vm_group: The name of vm_group the unit(s) should be deployed to.
"""
# Find the already-deployed machines belonging to this vm_group
machines = []
for machine_id, unit in self.machines.items():
if unit.vm_group == vm_group:
machines.append(machine_id)
# Deploy the first machine
resp, _ = self.RemoteHostCommand(
'juju deploy %s --to %s' % (charm, machines.pop())
)
# Get the name of the service
service = charm[charm.rindex('/') + 1 :]
# Deploy to the remaining machine(s)
for machine in machines:
resp, _ = self.RemoteHostCommand(
'juju add-unit %s --to %s' % (service, machine)
)
def JujuRelate(self, service1, service2):
"""Create a relation between two services.
Args:
service1: The first service to relate.
service2: The second service to relate.
"""
resp, _ = self.RemoteHostCommand(
'juju add-relation %s %s' % (service1, service2)
)
def Install(self, package_name):
"""Installs a PerfKit package on the VM."""
package = linux_packages.PACKAGES[package_name]
try:
if self.controller is None:
raise ValueError('self.controller is None')
# Make sure another unit doesn't try
# to install the charm at the same time
with self.controller.installation_lock:
if package_name not in self.controller._installed_packages:
package.JujuInstall(self.controller, self.vm_group)
self.controller._installed_packages.add(package_name)
except AttributeError as e:
logging.warning(
'Failed to install package %s, falling back to Apt (%s)',
package_name,
e,
)
if package_name not in self._installed_packages:
if hasattr(package, 'AptInstall'):
package.AptInstall(self)
elif hasattr(package, 'Install'):
package.Install(self)
else:
raise KeyError(
'Package %s has no install method for Juju machines.'
% package_name
)
self._installed_packages.add(package_name)
def SetupPackageManager(self):
if self.is_controller:
resp, _ = self.RemoteHostCommand(
'sudo add-apt-repository ppa:juju/stable'
)
super().SetupPackageManager()
def PrepareVMEnvironment(self):
"""Install and configure a Juju environment."""
super().PrepareVMEnvironment()
if self.is_controller:
self.InstallPackages('juju')
self.JujuConfigureEnvironment()
self.AuthenticateVm()
self._Bootstrap()
# Install the Juju agent on the other VMs
for unit in self.units:
unit.controller = self
self.JujuAddMachine(unit)
class BaseLinuxVirtualMachine(
BaseLinuxMixin, virtual_machine.BaseVirtualMachine
):
"""Linux VM for use with pytyping."""