perfkitbenchmarker/os_mixin.py (362 lines of code) (raw):

# Copyright 2017 PerfKitBenchmarker Authors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """OS Mixin class, which defines abstract properties of an OS. "Mixin" because most real implementations will inherit from a child of both BaseOSMixin & BaseVirtualMachine. eg CentOs7BasedAzureVirtualMachine inherits from AzureVirtualMachine which inherits from BaseVirtualMachine & CentOs7Mixin which inherits from BaseOSMixin. Implementation & details are somewhat confusingly coupled with BaseVirtualMachine. """ import abc import contextlib import logging import os.path import socket import time from typing import Any, Dict, List, Tuple, Union import uuid from absl import flags import jinja2 from perfkitbenchmarker import background_workload from perfkitbenchmarker import command_interface from perfkitbenchmarker import data from perfkitbenchmarker import disk from perfkitbenchmarker import errors from perfkitbenchmarker import events from perfkitbenchmarker import os_types from perfkitbenchmarker import vm_util FLAGS = flags.FLAGS # pylint: disable=g-missing-from-attributes,invalid-name class BaseOsMixin(command_interface.CommandInterface, metaclass=abc.ABCMeta): """The base class for OS Mixin classes. This class holds VM methods and attributes relating to the VM's guest OS. For methods and attributes that relate to the VM as a cloud resource, see BaseVirtualMachine and its subclasses. Attributes: bootable_time: The time when the VM finished booting. hostname: The VM's hostname. remote_access_ports: A list of ports which must be opened on the firewall in order to access the VM. """ # Represents whether the VM type can be (cleanly) rebooted. Should be false # for a class if rebooting causes issues, e.g. for KubernetesVirtualMachine # needing to reboot often indicates a design problem since restarting a # container can have side effects in certain situations. IS_REBOOTABLE = True password: str = None install_packages: bool # mixed from BaseVirtualMachine is_static: bool # mixed from BaseVirtualMachine scratch_disks: List[disk.BaseDisk] # mixed from BaseVirtualMachine name: str # mixed from BaseVirtualMachine ssh_private_key: str # mixed from BaseVirtualMachine proxy_jump: str # mixed from BaseVirtualMachine user_name: str # mixed from BaseVirtualMachine disable_interrupt_moderation: str # mixed from BaseVirtualMachine disable_rss: str # mixed from BaseVirtualMachine num_disable_cpus: int # mixed from BaseVirtualMachine ip_address: str # mixed from BaseVirtualMachine internal_ip: str # mixed from BaseVirtualMachine can_connect_via_internal_ip: bool # mixed from BaseVirtualMachine boot_completion_ip_subset: bool # mixed from BaseVirtualMachine @abc.abstractmethod def GetConnectionIp(self): """See BaseVirtualMachine.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._installed_packages = set() self.startup_script_output = None self.postrun_script_output = None self.bootable_time = None self.port_listening_time = None self.hostname = None self.proxy_jump = None # Ports that will be opened by benchmark_spec to permit access to the VM. self.remote_access_ports = [] # Port to be used to see if VM is ready to receive a remote command. self.primary_remote_access_port = None # Cached values self._reachable = {} self._total_memory_kb = None self.num_cpus: int = None self._is_smt_enabled = None # Update to Json type if ever available: # https://github.com/python/typing/issues/182 self.os_metadata: Dict[str, Union[str, int, list[str]]] = {} assert ( type(self).BASE_OS_TYPE in os_types.BASE_OS_TYPES ), '%s is not in %s' % (type(self).BASE_OS_TYPE, os_types.BASE_OS_TYPES) @property @classmethod @abc.abstractmethod def OS_TYPE(cls): raise NotImplementedError() @property @classmethod @abc.abstractmethod def BASE_OS_TYPE(cls): raise NotImplementedError() def GetOSResourceMetadata(self) -> Dict[str, Union[str, int, list[str]]]: """Returns a dict containing VM OS metadata. Returns: dict mapping string property key to value. """ return self.os_metadata def RunCommand( self, command: str | list[str], ignore_failure: bool = False, should_pre_log: bool = True, stack_level: int = 1, timeout: float | None = None, **kwargs: Any, ) -> tuple[str, str, int]: """Runs a command. Additional args can be supplied & are passed to lower level functions but aren't required. Args: command: A valid bash command in string form. ignore_failure: Ignore any failure if set to true. should_pre_log: Whether to print the command being run or not. stack_level: Number of stack frames to skip & get an "interesting" caller, for logging. 1 skips this function, 2 skips this & its caller, etc.. timeout: The time to wait in seconds for the command before exiting. None means no timeout. **kwargs: Additional command arguments. Returns: A tuple of stdout and stderr from running the command. Raises: RemoteCommandError: If there was a problem issuing the command. """ raise NotImplementedError() def RemoteCommand( self, command: str, ignore_failure: bool = False, timeout: float | None = None, **kwargs, ) -> Tuple[str, str]: """Runs a command on the VM. Derived classes may add additional kwargs if necessary, but they should not be used outside of the class itself since they are non standard. Args: command: A valid bash command. ignore_failure: Ignore any failure if set to true. timeout: The time to wait in seconds for the command before exiting. None means no timeout. **kwargs: Additional command arguments. Returns: A tuple of stdout and stderr from running the command. Raises: RemoteCommandError: If there was a problem issuing the command. """ raise NotImplementedError() def RemoteCommandWithReturnCode( self, *args, **kwargs ) -> Tuple[str, str, int]: """Runs a command on the VM & returns error code. Args: *args: Generic arguments, lining up with RemoteCommand. **kwargs: Additional arguments, including anything additional from child implementations. Returns: A tuple of stdout, stderr, return_code from running the command. Raises: RemoteCommandError: If there was a problem establishing the connection. """ raise NotImplementedError() def RobustRemoteCommand( self, command: str, timeout: float | None = None, ignore_failure: bool = False, ) -> Tuple[str, str]: """Runs a command on the VM in a more robust way than RemoteCommand. The default should be to call RemoteCommand and log that it is not yet implemented. This function should be overwritten it is decendents. Args: command: The command to run. timeout: The timeout for the command in seconds. ignore_failure: Ignore any failure if set to true. Returns: A tuple of stdout, stderr from running the command. Raises: RemoteCommandError: If there was a problem establishing the connection, or the command fails. """ logging.info('RobustRemoteCommand not implemented, using RemoteCommand.') return self.RemoteCommand(command, ignore_failure, timeout) def TryRemoteCommand(self, command: str, **kwargs): """Runs a remote command and returns True iff it succeeded.""" try: self.RemoteCommand(command, **kwargs) return True except errors.VirtualMachine.RemoteCommandError: return False except: raise def Reboot(self): """Reboots the VM. Returns: The duration in seconds from the time the reboot command was issued to the time we could SSH into the VM and verify that the timestamp changed. Raises: Exception: If trying to reboot a VM that isn't rebootable (e.g. Kubernetes). """ if not self.IS_REBOOTABLE: raise errors.VirtualMachine.VirtualMachineError( "Trying to reboot a VM that isn't rebootable." ) vm_bootable_time = None # Use self.bootable_time to determine if this is the first boot. # On the first boot, WaitForBootCompletion will only run once. # On subsequent boots, need to WaitForBootCompletion and ensure # the last boot time changed. if self.bootable_time is not None: vm_bootable_time = self.VMLastBootTime() before_reboot_timestamp = time.time() self._Reboot() while True: self.WaitForBootCompletion() # WaitForBootCompletion ensures that the machine is up # this is sufficient check for the first boot - but not for a reboot if vm_bootable_time != self.VMLastBootTime(): break reboot_duration_sec = time.time() - before_reboot_timestamp self._AfterReboot() return reboot_duration_sec def _BeforeSuspend(self): pass def _PostSuspend(self): pass def Suspend(self) -> float: """Suspends the vm. Future plans and edge cases: checking if a vm is suspendable. Accidentally suspending a VM that is already suspending. Trying to resume a VM that is not already suspended. Returns: The amount of time it takes to Suspend a VM that is suspendable. """ self._BeforeSuspend() before_suspend_timestamp = time.time() self._Suspend() self._PostSuspend() return time.time() - before_suspend_timestamp def Resume(self) -> float: """Resumes the vm. Returns: The amount of time it takes to resume a VM that is suspendable. """ before_resume_timestamp = time.time() self._Resume() # WaitForSSH tries to ssh into the VM,ensuring resume was successful self._WaitForSSH() return time.time() - before_resume_timestamp def _Reboot(self): """OS-specific implementation of reboot command.""" raise NotImplementedError() def _Suspend(self): """Provider specific implementation of a VM suspend command.""" raise NotImplementedError() def _Resume(self): """Provider specific implementation of a VM resume command.""" raise NotImplementedError() def _AfterReboot(self): """Performs any OS-specific setup on the VM following reboot. This will be called after every call to Reboot(). """ pass def Start(self) -> float: """Starts the VM. Returns: The duration in seconds from the time the start command was issued to the time we could SSH into the VM and verify that the timestamp changed. """ before_start_timestamp = time.time() self._Start() self._PostStart() self._WaitForSSH() start_duration_sec = time.time() - before_start_timestamp return start_duration_sec @abc.abstractmethod def _Start(self): """Provider-specific implementation of start command.""" raise NotImplementedError() def _PostStart(self): """Provider-specific checks after start command.""" pass def Stop(self) -> float: """Stop the VM. Returns: The duration in seconds from the time the start command was issued to after the API call """ before_stop_timestamp = time.time() self._Stop() self._PostStop() stop_duration_sec = time.time() - before_stop_timestamp return stop_duration_sec def _Stop(self): """Provider-specific implementation of stop command.""" raise NotImplementedError() def _PostStop(self): """Provider-specific checks after stop command.""" pass def RemoteCopy(self, file_path, remote_path='', copy_to=True): """Copies a file to or from the VM. Args: file_path: Local path to file. remote_path: Optional path of where to copy file on remote host. copy_to: True to copy to vm, False to copy from vm. Raises: RemoteCommandError: If there was a problem copying the file. """ raise NotImplementedError() def WaitForBootCompletion(self): """Waits until VM is has booted. Implementations of this method should set the 'bootable_time' attribute and the 'hostname' attribute. """ raise NotImplementedError() def _WaitForSSH(self, ip_address: Union[str, None] = None): """Waits until VM is ready. Implementations of this method should set the 'hostname' attribute. Args: ip_address: The IP address to use for SSH, if None an implementation default is used. """ raise NotImplementedError() def VMLastBootTime(self): """Returns the time the VM was last rebooted as reported by the VM.""" raise NotImplementedError() def OnStartup(self): """Performs any necessary setup on the VM specific to the OS. This will be called once immediately after the VM has booted. """ events.on_vm_startup.send(vm=self) # Resets the cached SMT enabled status and number cpus value. self._is_smt_enabled = None self.num_cpus = None def RecordAdditionalMetadata(self): """After the VM has been prepared, store VM metadata.""" # Skip metadata capture if the VM lacks a boot time, indicating that prior # VM connection attempts failed. if not self.bootable_time: return if self.num_cpus is None: self.num_cpus = self._GetNumCpus() def PrepareVMEnvironment(self): """Performs any necessary setup on the VM specific to the OS. This will be called once after setting up scratch disks. """ if self.disable_interrupt_moderation: self.DisableInterruptModeration() if self.disable_rss: self.DisableRSS() def DisableInterruptModeration(self): """Disables interrupt moderation on the VM.""" raise NotImplementedError() def DisableRSS(self): """Disables RSS on the VM.""" raise NotImplementedError() def Install(self, package_name): """Installs a PerfKit package on the VM.""" raise NotImplementedError() def InstallPackages(self, packages: str) -> None: """Installs packages using the OS's package manager.""" pass def Uninstall(self, package_name): """Uninstalls a PerfKit package on the VM.""" raise NotImplementedError() def PackageCleanup(self): """Cleans up all installed packages. Deletes the temp directory, restores packages, and uninstalls all PerfKit packages. """ raise NotImplementedError() def SetupLocalDisks(self): """Perform OS specific setup on any local disks that exist.""" pass def LogVmDebugInfo(self): """Logs OS-specific debug info. Must be overridden on an OS mixin.""" pass def PushFile(self, source_path, remote_path=''): """Copies a file or a directory to the VM. Args: source_path: The location of the file or directory on the LOCAL machine. remote_path: The destination of the file on the REMOTE machine, default is the home directory. """ self.RemoteCopy(source_path, remote_path) def PullFile(self, local_path, remote_path): """Copies a file or a directory from the VM to the local machine. Args: local_path: string. The destination path of the file or directory on the local machine. remote_path: string. The source path of the file or directory on the remote machine. """ self.RemoteCopy(local_path, remote_path, copy_to=False) def WriteTemporaryFile(self, file_contents: str) -> str: """Writes a temporary file to the VM. Args: file_contents: The contents of the file. Returns: The full filename. """ name = '/tmp/' + str(uuid.uuid4())[-8:] vm_util.CreateRemoteFile(self, file_contents, name) return name def PrepareResourcePath(self, resource_name, search_user_paths=True) -> str: """Prepares a resource from local loaders & returns path on machine. Loaders are searched in order until the resource is found. If no loader provides 'resource_name', an exception is thrown. If 'search_user_paths' is true, the directories specified by "--data_search_paths" are consulted before the default paths. Args: resource_name: string. Name of a resource. search_user_paths: boolean. Whether paths from "--data_search_paths" should be searched before the default paths. Returns: A path to the resource on the local or remote machine's filesystem. Raises: ResourceNotFound: When resource was not found. """ local_path = data.ResourcePath(resource_name, search_user_paths) destination_path = '/tmp/' + resource_name self.PushFile(local_path, destination_path) return destination_path def PushDataFile(self, data_file, remote_path='', should_double_copy=None): """Upload a file in perfkitbenchmarker.data directory to the VM. Args: data_file: The filename of the file to upload. remote_path: The destination for 'data_file' on the VM. If not specified, the file will be placed in the user's home directory. should_double_copy: Indicates whether to first copy to the home directory Raises: perfkitbenchmarker.data.ResourceNotFound: if 'data_file' does not exist. """ file_path = data.ResourcePath(data_file) if should_double_copy: home_file_path = '~/' + data_file self.PushFile(file_path, home_file_path) copy_cmd = ' '.join(['cp', home_file_path, remote_path]) self.RemoteCommand(copy_cmd) else: self.PushFile(file_path, remote_path) def RenderTemplate(self, template_path, remote_path, context): """Renders a local Jinja2 template and copies it to the remote host. The template will be provided variables defined in 'context', as well as a variable named 'vm' referencing this object. Args: template_path: string. Local path to jinja2 template. remote_path: string. Remote path for rendered file on the remote vm. context: dict. Variables to pass to the Jinja2 template during rendering. Raises: jinja2.UndefinedError: if template contains variables not present in 'context'. RemoteCommandError: If there was a problem copying the file. """ with open(template_path) as fp: template_contents = fp.read() environment = jinja2.Environment(undefined=jinja2.StrictUndefined) template = environment.from_string(template_contents) prefix = 'pkb-' + os.path.basename(template_path) with vm_util.NamedTemporaryFile( prefix=prefix, dir=vm_util.GetTempDir(), delete=False, mode='w' ) as tf: tf.write(template.render(vm=self, **context)) tf.close() self.RemoteCopy(tf.name, remote_path) def DiskCreatedOnVMCreation(self, data_disk): """Returns whether the disk has been created during VM creation.""" return data_disk.disk_type == disk.LOCAL def _CreateScratchDiskFromDisks(self, disk_spec, disks): """Helper method to create scratch data disks. Given a list of BaseDisk objects, create and attach scratch disk to VM. Multiple BaseDisk objects are striped and combined into one 'logical' data disk and treated as a single disk from a benchmarks perspective. Args: disk_spec: The BaseDiskSpec object corresponding to the disk. disks: A list of the disk(s) to be created, attached, and striped. Returns: The created scratch disk. """ if len(disks) > 1: # If the disk_spec called for a striped disk, create one. scratch_disk = disk.StripedDisk(disk_spec, disks) else: scratch_disk = disks[0] if not self.DiskCreatedOnVMCreation(scratch_disk): scratch_disk.Create() scratch_disk.Attach(self) return scratch_disk def PrepareScratchDisk(self, scratch_disk, disk_spec): """Expose internal prepare scratch disk.""" self._PrepareScratchDisk(scratch_disk, disk_spec) def _PrepareScratchDisk(self, scratch_disk, disk_spec): """Helper method to format and mount scratch disk. Args: scratch_disk: Scratch disk to be formatted and mounted. disk_spec: The BaseDiskSpec object corresponding to the disk. """ raise NotImplementedError() def _SetNumCpus(self): """Sets the number of CPUs on the VM. Returns: The number of CPUs on the VM. """ if self.num_cpus is None: self.num_cpus = self._GetNumCpus() return self.num_cpus def NumCpusForBenchmark(self, report_only_physical_cpus=False): """Gets the number of CPUs for benchmark configuration purposes. Many benchmarks scale their configurations based off of the number of CPUs available on the system (e.g. determine the number of threads). Benchmarks should use this property rather than num_cpus so that users can override that behavior with the --num_cpus_override flag. Not all benchmarks may use this, and they may use more than this number of CPUs. To actually ensure that they are not being used during benchmarking, the CPUs should be disabled. Args: report_only_physical_cpus: Whether to report only the physical (non-SMT) CPUs. Default is to report all vCPUs. Returns: The number of CPUs for benchmark configuration purposes. """ if FLAGS.num_cpus_override: return FLAGS.num_cpus_override if report_only_physical_cpus and self.IsSmtEnabled(): # return half the number of CPUs. return self.num_cpus // 2 if self.num_disable_cpus: return self.num_cpus - self.num_disable_cpus return self.num_cpus def _GetNumCpus(self): """Returns the number of logical CPUs on the VM. This method does not cache results (unlike "num_cpus"). """ raise NotImplementedError() @property def total_free_memory_kb(self): """Gets the amount of free memory on the VM. Returns: The number of kilobytes of memory on the VM. """ return self._GetTotalFreeMemoryKb() @property def total_memory_kb(self): """Gets the amount of memory on the VM. Returns: The number of kilobytes of memory on the VM. """ if not self._total_memory_kb: self._total_memory_kb = self._GetTotalMemoryKb() return self._total_memory_kb def _GetTotalFreeMemoryKb(self): """Returns the amount of free physical memory on the VM in Kilobytes.""" raise NotImplementedError() def _GetTotalMemoryKb(self): """Returns the amount of physical memory on the VM in Kilobytes. This method does not cache results (unlike "total_memory_kb"). """ raise NotImplementedError() def IsReachable(self, target_vm): """Indicates whether the target VM can be reached from its internal ip. Args: target_vm: The VM whose reachability is being tested. Returns: True if the internal ip address of the target VM can be reached, false otherwise. """ if target_vm not in self._reachable: if target_vm.internal_ip: self._reachable[target_vm] = self._TestReachable(target_vm.internal_ip) else: self._reachable[target_vm] = False return self._reachable[target_vm] def _TestReachable(self, ip): """Returns True if the VM can reach the ip address and False otherwise.""" raise NotImplementedError() def StartBackgroundWorkload(self): """Start the background workload.""" for workload in background_workload.BACKGROUND_WORKLOADS: if workload.IsEnabled(self): if self.OS_TYPE in workload.EXCLUDED_OS_TYPES: raise NotImplementedError() workload.Start(self) def StopBackgroundWorkload(self): """Stop the background workoad.""" for workload in background_workload.BACKGROUND_WORKLOADS: if workload.IsEnabled(self): if self.OS_TYPE in workload.EXCLUDED_OS_TYPES: raise NotImplementedError() workload.Stop(self) def PrepareBackgroundWorkload(self): """Prepare for the background workload.""" for workload in background_workload.BACKGROUND_WORKLOADS: if workload.IsEnabled(self): if self.OS_TYPE in workload.EXCLUDED_OS_TYPES: raise NotImplementedError() workload.Prepare(self) def SetReadAhead(self, num_sectors, devices): """Set read-ahead value for block devices. Args: num_sectors: int. Number of sectors of read ahead. devices: list of strings. A list of block devices. """ raise NotImplementedError() def GetSha256sum(self, path, filename): """Gets the sha256sum hash for a filename in a path on the VM. Args: path: string; Path on the VM. filename: string; Name of the file in the path. Returns: string; The sha256sum hash. """ raise NotImplementedError() def RecoverChunkedPreprovisionedData(self, path, filename): """Recover chunked preprovisioned data.""" raise NotImplementedError() def CheckPreprovisionedData( self, install_path, module_name, filename, expected_sha256 ): """Checks preprovisioned data for a checksum. Checks the expected 256sum against the actual sha256sum. Called after the file is downloaded. This function should be overridden by each OS-specific MixIn. Args: install_path: The install path on this VM. The benchmark is installed at this path in a subdirectory of the benchmark name. module_name: Name of the benchmark associated with this data file. filename: The name of the file that was downloaded. expected_sha256: The expected sha256 checksum value. """ actual_sha256 = self.GetSha256sum(install_path, filename) if actual_sha256 != expected_sha256: raise errors.Setup.BadPreprovisionedDataError( 'Invalid sha256sum for %s/%s: %s (actual) != %s (expected). Might ' 'want to run using --preprovision_ignore_checksum ' '(not recommended).' % (module_name, filename, actual_sha256, expected_sha256) ) def TestConnectRemoteAccessPort(self, port=None, socket_timeout=0.5): """Tries to connect to remote access port and throw if it fails. Args: port: Integer of the port to connect to. Defaults to the default remote connection port of the VM. socket_timeout: The number of seconds to wait on the socket before failing and retrying. If this is too low, the connection may never succeed. If this is too high it will add latency (because the current connection may fail after a time that a new connection would succeed). Defaults to 500ms. """ if not port: port = self.primary_remote_access_port # TODO(user): refactor to reuse sockets? with contextlib.closing( socket.socket(socket.AF_INET, socket.SOCK_STREAM) ) as sock: # Before the IP is reachable the socket times out (and throws). After that # it throws immediately. sock.settimeout(socket_timeout) # seconds sock.connect((self.GetConnectionIp(), port)) logging.info('Connected to port %s on %s', port, self) def IsSmtEnabled(self): """Whether simultaneous multithreading (SMT) is enabled on the vm.""" if self._is_smt_enabled is None: self._is_smt_enabled = self._IsSmtEnabled() return self._is_smt_enabled def _IsSmtEnabled(self): """Whether SMT is enabled on the vm.""" raise NotImplementedError() # TODO(pclay): Implement on Windows, make abstract and non Optional @property def cpu_arch(self) -> str | None: """The basic CPU architecture of the VM.""" return None def GetCPUVersion(self) -> str | None: """Get the CPU version of the VM.""" return None def GenerateAndCaptureLogs(self) -> list[str]: """Generates and captures logs from the VM.""" return [] class DeprecatedOsMixin(BaseOsMixin): """Class that adds a deprecation log message to OsBasedVms.""" # The time or version in which this OS input will be removed END_OF_LIFE = None # Optional alternative to use instead. ALTERNATIVE_OS = None def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) assert self.OS_TYPE assert self.END_OF_LIFE warning = "os_type '%s' is deprecated and will be removed after %s." % ( self.OS_TYPE, self.END_OF_LIFE, ) if self.ALTERNATIVE_OS: warning += " Use '%s' instead." % self.ALTERNATIVE_OS logging.warning(warning)