perfkitbenchmarker/virtual_machine.py (805 lines of code) (raw):

# Copyright 2017 PerfKitBenchmarker Authors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Class to represent a Virtual Machine object. All VM specifics are self-contained and the class provides methods to operate on the VM: boot, shutdown, etc. """ import abc import copy import enum import logging import os.path import threading import typing from typing import Any, Dict, List from absl import flags from perfkitbenchmarker import benchmark_lookup from perfkitbenchmarker import data from perfkitbenchmarker import disk from perfkitbenchmarker import disk_strategies from perfkitbenchmarker import errors from perfkitbenchmarker import flags as pkb_flags from perfkitbenchmarker import os_mixin from perfkitbenchmarker import os_types from perfkitbenchmarker import package_lookup from perfkitbenchmarker import provider_info from perfkitbenchmarker import resource from perfkitbenchmarker import vm_util from perfkitbenchmarker.configs import option_decoders from perfkitbenchmarker.configs import spec FLAGS = flags.FLAGS DEFAULT_USERNAME = 'perfkit' QUOTA_EXCEEDED_MESSAGE = 'Creation failed due to quota exceeded: ' PREPROVISIONED_DATA_TIMEOUT = 600 def ValidateVmMetadataFlag(options_list): """Verifies correct usage of the vm metadata flag. All provided options must be in the form of key:value. Args: options_list: A list of strings parsed from the provided value for the flag. Returns: True if the list of options provided as the value for the flag meets requirements. Raises: flags.ValidationError: If the list of options provided as the value for the flag does not meet requirements. """ for option in options_list: if ':' not in option[1:-1]: raise flags.ValidationError( '"%s" not in expected key:value format' % option ) return True # vm_metadata flag name VM_METADATA = 'vm_metadata' flags.DEFINE_boolean( 'dedicated_hosts', False, 'If True, use hosts that only have VMs from the same ' 'benchmark running on them.', ) flags.DEFINE_integer( 'num_vms_per_host', None, 'The number of VMs per dedicated host. If None, VMs will be packed on a ' 'single host until no more can be packed at which point a new host will ' 'be created.', ) flags.DEFINE_integer( 'num_cpus_override', None, 'Rather than detecting the number of CPUs present on the machine, use this ' 'value if set. Some benchmarks will use this number to automatically ' 'scale their configurations; this can be used as a method to control ' 'benchmark scaling. It will also change the num_cpus metadata ' 'published along with the benchmark data.', ) flags.DEFINE_list( VM_METADATA, [], 'Metadata to add to the vm. It expectskey:value pairs.' ) flags.register_validator(VM_METADATA, ValidateVmMetadataFlag) flags.DEFINE_bool( 'skip_firewall_rules', False, 'If set, this run will not create firewall rules. This is useful if the ' 'user project already has all of the firewall rules in place and/or ' 'creating new ones is expensive', ) flags.DEFINE_bool( 'preprovision_ignore_checksum', False, 'Ignore checksum verification for preprovisioned data. ' 'Not recommended, please use with caution', ) flags.DEFINE_boolean( 'connect_via_internal_ip', False, 'Whether to use internal IP addresses for running commands on and pushing ' 'data to VMs. By default, PKB interacts with VMs using external IP ' 'addresses.', ) _ASSIGN_EXTERNAL_IP = flags.DEFINE_boolean( 'assign_external_ip', True, 'If True, an external (public) IP will be created for VMs. ' 'If False, --connect_via_internal_ip may also be needed.', ) flags.DEFINE_string( 'boot_startup_script', None, ( 'Startup script to run during boot. ' 'Requires provider support, only implemented for Linux VMs ' 'on GCP, AWS, Azure for now.' ), ) _VM_INSTANCE_NAME_SUFFIX = flags.DEFINE_string( 'vm_instance_name_suffix', None, ( 'Optional, a suffix to add after the VM instance name. Without this,' ' instance is named as pkb-{run_uri}-{instance_number}. With this' ' option, the instance name will be' ' pkb-{run_uri}-{instance_number}-{vm_instance_name_suffix}' ), ) _VM_INSTANCE_NAME_SUFFIXES = flags.DEFINE_list( 'vm_instance_name_suffixes', None, ( 'Optional, the list of suffixes to add after the VM instance name.' ' Without this, instance is named as pkb-{run_uri}-{instance_number}.' ' With this option, the instance name will be' ' pkb-{run_uri}-{vm_instance_name_suffixes[instance_number]}.' ' The number of suffixes must be equal to the total number of VMs' ' (servers and clients).' ), ) _VM_NAME_PREFIX = flags.DEFINE_string( 'vm_name_prefix', None, ( 'Optional, a prefix to add after the VM instance name. Without this,' ' instance is named as pkb-{run_uri}-{instance_number}. With this' ' option, the instance name will be' ' {prefix}-{instance_number}-{vm_instance_name_suffix}' ), ) _REQUIRED_CPU_VERSION = flags.DEFINE_string( 'required_cpu_version', None, 'The required CPU version. Benchmark will fail if CPU does not match.' ' The version is defined as the fields in /proc/cpuinfo that define the cpu' ' version joined by underscores. On x86, the fields are (Vendor ID,' ' cpu family, Model, Stepping) (e.g. GenuineIntel_6_143_8). On ARM, the' ' fields are (CPU implementer, CPU architecture, CPU variant, CPU part).' ' This is only useful when a machine type comprises of multiple CPU' ' versions.', ) _REQUIRED_CPU_VERSION_RETRIES = flags.DEFINE_integer( 'required_cpu_version_retries', 5, 'The number of times to retry if the required CPU version does not match.', ) _VM_USER_NAME = flags.DEFINE_string( 'vm_user_name', DEFAULT_USERNAME, 'The user name to use for the VM. If not specified, the default user name' ' "perfkit" is used.', ) SKIP_VM_PREPARATION = flags.DEFINE_bool( 'skip_vm_preparation', False, 'Skips the PrepareAfterBoot step for VMs.', ) @enum.unique class BootCompletionIpSubset(enum.Enum): DEFAULT = enum.auto() EXTERNAL = enum.auto() INTERNAL = enum.auto() BOTH = enum.auto() _BOOT_COMPLETION_IP_SUBSET = flags.DEFINE_enum_class( 'boot_completion_ip_subset', BootCompletionIpSubset.DEFAULT, BootCompletionIpSubset, ( 'The ip(s) to use to measure BootCompletion. If DEFAULT, determined' ' based on --connect_via_internal_ip.' ), ) # Deprecated. Use connect_via_internal_ip. flags.DEFINE_boolean( 'ssh_via_internal_ip', False, 'Whether to use internal IP addresses for running commands on and pushing ' 'data to VMs. By default, PKB interacts with VMs using external IP ' 'addresses.', ) flags.DEFINE_boolean( 'retry_on_rate_limited', True, 'Whether to retry commands when rate limited.', ) GPU_K80 = 'k80' GPU_P100 = 'p100' GPU_V100 = 'v100' GPU_A100 = 'a100' GPU_H100 = 'h100' GPU_P4 = 'p4' GPU_P4_VWS = 'p4-vws' GPU_T4 = 't4' GPU_L4 = 'l4' GPU_A10 = 'a10' TESLA_GPU_TYPES = [ GPU_K80, GPU_P100, GPU_V100, GPU_A100, GPU_P4, GPU_P4_VWS, GPU_T4, GPU_A10, ] VALID_GPU_TYPES = TESLA_GPU_TYPES + [GPU_L4, GPU_H100] CPUARCH_X86_64 = 'x86_64' CPUARCH_AARCH64 = 'aarch64' GPU_COUNT = flags.DEFINE_integer( 'gpu_count', None, 'Number of gpus to attach to the VM. Requires gpu_type to be specified.', ) GPU_TYPE = flags.DEFINE_enum( 'gpu_type', None, VALID_GPU_TYPES, 'Type of gpus to attach to the VM. Requires gpu_count to be specified.', ) def GetVmSpecClass(cloud: str, platform: str | None = None): """Returns the VmSpec class with corresponding attributes. Args: cloud: The cloud being used. platform: The vm platform being used (see enum above). If not provided, defaults to flag value. Returns: A child of BaseVmSpec with the corresponding attributes. """ if platform is None: platform = provider_info.VM_PLATFORM.value return spec.GetSpecClass(BaseVmSpec, CLOUD=cloud, PLATFORM=platform) def GetVmClass(cloud: str, os_type: str, platform: str | None = None): """Returns the VM class with corresponding attributes. Args: cloud: The cloud being used. os_type: The os type of the VM. platform: The vm platform being used (see enum above). If not provided, defaults to flag value. Returns: A child of BaseVirtualMachine with the corresponding attributes. """ if platform is None: platform = provider_info.VM_PLATFORM.value return resource.GetResourceClass( BaseVirtualMachine, CLOUD=cloud, OS_TYPE=os_type, PLATFORM=platform, ) class BaseVmSpec(spec.BaseSpec): """Storing various data about a single vm. Attributes: zone: The region / zone the in which to launch the VM. cidr: The CIDR subnet range in which to launch the VM. machine_type: The provider-specific instance type (e.g. n1-standard-8). gpu_count: None or int. Number of gpus to attach to the VM. gpu_type: None or string. Type of gpus to attach to the VM. image: The disk image to boot from. assign_external_ip: Bool. If true, create an external (public) IP. install_packages: If false, no packages will be installed. This is useful if benchmark dependencies have already been installed. background_cpu_threads: The number of threads of background CPU usage while running the benchmark. background_network_mbits_per_sec: The number of megabits per second of background network traffic during the benchmark. background_network_ip_type: The IP address type (INTERNAL or EXTERNAL) to use for generating background network workload. disable_interrupt_moderation: If true, disables interrupt moderation. disable_rss: = If true, disables rss. boot_startup_script: Startup script to run during boot. vm_metadata: = Additional metadata for the VM. """ SPEC_TYPE = 'BaseVmSpec' SPEC_ATTRS = ['CLOUD', 'PLATFORM'] CLOUD = None PLATFORM = provider_info.DEFAULT_VM_PLATFORM def __init__(self, *args, **kwargs): self.zone = None self.cidr = None self.machine_type: str self.gpu_count = None self.gpu_type = None self.image = None self.assign_external_ip = None self.install_packages = None self.background_cpu_threads = None self.background_network_mbits_per_sec = None self.background_network_ip_type = None self.disable_interrupt_moderation = None self.disable_rss = None self.vm_metadata: Dict[str, Any] = None self.boot_startup_script: str = None self.internal_ip: str super().__init__(*args, **kwargs) @classmethod def _ApplyFlags(cls, config_values, flag_values): """Overrides config values with flag values. Can be overridden by derived classes to add support for specific flags. Args: config_values: dict mapping config option names to provided values. Is modified by this function. flag_values: flags.FlagValues. Runtime flags that may override the provided config values. Returns: dict mapping config option names to values derived from the config values or flag values. """ super()._ApplyFlags(config_values, flag_values) if flag_values['image'].present: config_values['image'] = flag_values.image if flag_values['install_packages'].present: config_values['install_packages'] = flag_values.install_packages if flag_values['machine_type'].present: config_values['machine_type'] = flag_values.machine_type if flag_values['background_cpu_threads'].present: config_values['background_cpu_threads'] = ( flag_values.background_cpu_threads ) if flag_values['background_network_mbits_per_sec'].present: config_values['background_network_mbits_per_sec'] = ( flag_values.background_network_mbits_per_sec ) if flag_values['background_network_ip_type'].present: config_values['background_network_ip_type'] = ( flag_values.background_network_ip_type ) if flag_values['dedicated_hosts'].present: config_values['use_dedicated_host'] = flag_values.dedicated_hosts if flag_values['num_vms_per_host'].present: config_values['num_vms_per_host'] = flag_values.num_vms_per_host if flag_values['gpu_type'].present: config_values['gpu_type'] = flag_values.gpu_type if flag_values['gpu_count'].present: config_values['gpu_count'] = flag_values.gpu_count if flag_values['assign_external_ip'].present: config_values['assign_external_ip'] = flag_values.assign_external_ip if flag_values['disable_interrupt_moderation'].present: config_values['disable_interrupt_moderation'] = ( flag_values.disable_interrupt_moderation ) if flag_values['disable_rss'].present: config_values['disable_rss'] = flag_values.disable_rss if flag_values['vm_metadata'].present: config_values['vm_metadata'] = flag_values.vm_metadata if flag_values['boot_startup_script'].present: config_values['boot_startup_script'] = flag_values.boot_startup_script if 'gpu_count' in config_values and 'gpu_type' not in config_values: raise errors.Config.MissingOption( 'gpu_type must be specified if gpu_count is set' ) if 'gpu_type' in config_values and 'gpu_count' not in config_values: raise errors.Config.MissingOption( 'gpu_count must be specified if gpu_type is set' ) @classmethod def _GetOptionDecoderConstructions(cls): """Gets decoder classes and constructor args for each configurable option. Can be overridden by derived classes to add options or impose additional requirements on existing options. Returns: dict. Maps option name string to a (ConfigOptionDecoder class, dict) pair. The pair specifies a decoder class and its __init__() keyword arguments to construct in order to decode the named option. """ result = super()._GetOptionDecoderConstructions() result.update({ 'disable_interrupt_moderation': ( option_decoders.BooleanDecoder, {'default': False}, ), 'disable_rss': (option_decoders.BooleanDecoder, {'default': False}), 'image': ( option_decoders.StringDecoder, {'none_ok': True, 'default': None}, ), 'install_packages': ( option_decoders.BooleanDecoder, {'default': True}, ), 'machine_type': ( option_decoders.StringDecoder, {'none_ok': True, 'default': None}, ), 'assign_external_ip': ( option_decoders.BooleanDecoder, {'default': True}, ), 'gpu_type': ( option_decoders.EnumDecoder, {'valid_values': VALID_GPU_TYPES, 'default': None}, ), 'gpu_count': ( option_decoders.IntDecoder, {'min': 1, 'default': None}, ), 'zone': ( option_decoders.StringDecoder, {'none_ok': True, 'default': None}, ), 'cidr': ( option_decoders.StringDecoder, {'none_ok': True, 'default': None}, ), 'use_dedicated_host': ( option_decoders.BooleanDecoder, {'default': False}, ), 'num_vms_per_host': (option_decoders.IntDecoder, {'default': None}), 'background_network_mbits_per_sec': ( option_decoders.IntDecoder, {'none_ok': True, 'default': None}, ), 'background_network_ip_type': ( option_decoders.EnumDecoder, { 'default': vm_util.IpAddressSubset.EXTERNAL, 'valid_values': [ vm_util.IpAddressSubset.EXTERNAL, vm_util.IpAddressSubset.INTERNAL, ], }, ), 'background_cpu_threads': ( option_decoders.IntDecoder, {'none_ok': True, 'default': None}, ), 'boot_startup_script': ( option_decoders.StringDecoder, {'none_ok': True, 'default': None}, ), 'vm_metadata': ( option_decoders.ListDecoder, { 'item_decoder': option_decoders.StringDecoder(), 'default': [], }, ), }) return result class BaseVirtualMachine(os_mixin.BaseOsMixin, resource.BaseResource): """Base class for Virtual Machines. This class holds VM methods and attributes relating to the VM as a cloud resource. For methods and attributes that interact with the VM's guest OS, see BaseOsMixin and its subclasses. Attributes: image: The disk image used to boot. internal_ip: Internal IP address. assign_external_ip: If True, create an external (public) IP. ip_address: Public (external) IP address. machine_type: The provider-specific instance type (e.g. n1-standard-8). project: The provider-specific project associated with the VM (e.g. artisanal-lightbulb-883). ssh_public_key: Path to SSH public key file. ssh_private_key: Path to SSH private key file. user_name: Account name for login. the contents of 'ssh_public_key' should be in .ssh/authorized_keys for this user. zone: The region / zone the VM was launched in. cidr: The CIDR range the VM was launched in. disk_specs: list of BaseDiskSpec objects. Specifications for disks attached to the VM. scratch_disks: list of BaseDisk objects. Scratch disks attached to the VM. max_local_disks: The number of local disks on the VM that can be used as scratch disks or that can be striped together. background_cpu_threads: The number of threads of background CPU usage while running the benchmark. background_network_mbits_per_sec: Number of mbits/sec of background network usage while running the benchmark. background_network_ip_type: Type of IP address to use for generating background network workload vm_group: The VM group this VM is associated with, if applicable. create_operation_name: The name of a VM's create command operation, used to poll the operation in WaitUntilRunning. create_return_time: The time at which a VM's create command returns. is_running_time: The time at which the VM entered the running state. """ is_static = False RESOURCE_TYPE = 'BaseVirtualMachine' REQUIRED_ATTRS = ['CLOUD', 'OS_TYPE', 'PLATFORM'] # TODO(user): Define this value in all individual children. PLATFORM = provider_info.DEFAULT_VM_PLATFORM _instance_counter_lock = threading.Lock() _instance_counter = 0 def __init__(self, vm_spec: BaseVmSpec): """Initialize BaseVirtualMachine class. Args: vm_spec: virtual_machine.BaseVmSpec object of the vm. """ super().__init__() with self._instance_counter_lock: self.instance_number = self._instance_counter name_prefix = ( _VM_NAME_PREFIX.value if _VM_NAME_PREFIX.value else f'pkb-{FLAGS.run_uri}' ) if _VM_INSTANCE_NAME_SUFFIX.value: self.name = '%s-%d-%s' % ( name_prefix, self.instance_number, _VM_INSTANCE_NAME_SUFFIX.value, ) elif _VM_INSTANCE_NAME_SUFFIXES.value: self.name = '%s-%s' % ( name_prefix, _VM_INSTANCE_NAME_SUFFIXES.value[self.instance_number], ) else: self.name = '%s-%d' % (name_prefix, self.instance_number) BaseVirtualMachine._instance_counter += 1 self.disable_interrupt_moderation = vm_spec.disable_interrupt_moderation self.disable_rss = vm_spec.disable_rss self.zone = vm_spec.zone self.cidr = vm_spec.cidr self.machine_type = vm_spec.machine_type self.gpu_count = vm_spec.gpu_count self.gpu_type = vm_spec.gpu_type self.image = vm_spec.image self.install_packages = vm_spec.install_packages self.can_connect_via_internal_ip = ( FLAGS.ssh_via_internal_ip or FLAGS.connect_via_internal_ip ) self.boot_completion_ip_subset = _BOOT_COMPLETION_IP_SUBSET.value self.assign_external_ip = vm_spec.assign_external_ip self.ip_address = None self.internal_ip = None self.internal_ips = [] self.user_name = _VM_USER_NAME.value self.ssh_public_key = vm_util.GetPublicKeyPath() self.ssh_private_key = vm_util.GetPrivateKeyPath() self.disk_specs = [] self.scratch_disks = [] self.max_local_disks = 0 self.local_disk_counter = 0 self.remote_disk_counter = 0 self.host = None self.background_cpu_threads = vm_spec.background_cpu_threads self.background_network_mbits_per_sec = ( vm_spec.background_network_mbits_per_sec ) self.background_network_ip_type = vm_spec.background_network_ip_type self.use_dedicated_host = None self.num_vms_per_host = None self.network = None self.firewall = None self.tcp_congestion_control = None self.numa_node_count = None self.num_disable_cpus = None self.capacity_reservation_id = None self.vm_metadata = dict(item.split(':', 1) for item in vm_spec.vm_metadata) self.vm_group = None self.id = None self.is_aarch64 = False self.cpu_version = None self.create_operation_name = None self.create_return_time = None self.is_running_time = None self.boot_startup_script = vm_spec.boot_startup_script if self.OS_TYPE == os_types.CORE_OS and self.boot_startup_script: raise errors.Setup.InvalidConfigurationError( 'Startup script are not supported on CoreOS.' ) @property @classmethod @abc.abstractmethod def CLOUD(cls): raise NotImplementedError() def __repr__(self): return '<BaseVirtualMachine [ip={}, internal_ip={}]>'.format( self.ip_address, self.internal_ip ) def __str__(self): if self.ip_address: return self.ip_address return super().__str__() def GetConnectionIp(self): """Gets the IP to use for connecting to the VM.""" if not self.created: raise errors.VirtualMachine.VirtualMachineError( 'VM was not properly created, but PKB is attempting to connect to ' 'it anyways. Caller should guard against VM not being created.' ) if self.can_connect_via_internal_ip: return self.internal_ip if not self.ip_address: raise errors.VirtualMachine.VirtualMachineError( f'{self.name} does not have a (public) ip_address. Do you need to' ' specify --connect_via_internal_ip?' ) return self.ip_address def GetInternalIPs(self): """Gets the Internal IP's for the VM.""" if self.internal_ips: return self.internal_ips elif self.internal_ip: return [self.internal_ip] return [] def SetDiskSpec(self, disk_spec, disk_count): """Set Disk Specs of the current VM.""" # This method will be depreciate soon. if disk_spec.disk_type == disk.LOCAL and disk_count is None: disk_count = self.max_local_disks self.disk_specs = [copy.copy(disk_spec) for _ in range(disk_count)] # In the event that we need to create multiple disks from the same # DiskSpec, we need to ensure that they have different mount points. if disk_count > 1 and disk_spec.mount_point: for i, vm_disk_spec in enumerate(self.disk_specs): vm_disk_spec.mount_point += str(i) def SetupAllScratchDisks(self): """Set up all scratch disks of the current VM.""" # This method will be depreciate soon. # Prepare vm scratch disks: if any(spec.disk_type == disk.RAM for spec in self.disk_specs): disk_strategies.SetUpRamDiskStrategy(self, self.disk_specs[0]).SetUpDisk() return if any(spec.disk_type == disk.NFS for spec in self.disk_specs): disk_strategies.SetUpNFSDiskStrategy(self, self.disk_specs[0]).SetUpDisk() return if any(spec.disk_type == disk.SMB for spec in self.disk_specs): disk_strategies.SetUpSMBDiskStrategy(self, self.disk_specs[0]).SetUpDisk() return if any(spec.disk_type == disk.LOCAL for spec in self.disk_specs): self.SetupLocalDisks() for disk_spec_id, disk_spec in enumerate(self.disk_specs): self.CreateScratchDisk(disk_spec_id, disk_spec) # TODO(user): Simplify disk logic. if disk_spec.num_striped_disks > 1: # scratch disks has already been created and striped together. break # This must come after Scratch Disk creation to support the # Containerized VM case def CreateScratchDisk(self, disk_spec_id, disk_spec): """Create a VM's scratch disk. Args: disk_spec_id: Deterministic order of this disk_spec in the VM's list of disk_specs. disk_spec: virtual_machine.BaseDiskSpec object of the disk. """ pass def DeleteScratchDisks(self): """Delete a VM's scratch disks.""" for scratch_disk in self.scratch_disks: if scratch_disk.disk_type != disk.LOCAL: scratch_disk.Delete() def GetScratchDir(self, disk_num=0): """Gets the path to the scratch directory. Args: disk_num: The number of the disk to mount. Returns: The mounted disk directory. """ if disk_num >= len(self.scratch_disks): raise errors.Error( 'GetScratchDir(disk_num=%s) is invalid, max disk_num is %s' % (disk_num, len(self.scratch_disks)) ) return self.scratch_disks[disk_num].mount_point def CreateAndBoot(self): """Creates a single VM and waits for boot to complete. This is done repeatedly to get --required_cpu_version if it is set. """ def CreateAndBootOnce(): self.Create() logging.info('VM: %s (%s)', self.ip_address, self.internal_ip) logging.info('Waiting for boot completion.') self.AllowRemoteAccessPorts() self.WaitForBootCompletion() self.cpu_version = self.GetCPUVersion() if ( _REQUIRED_CPU_VERSION.value and _REQUIRED_CPU_VERSION.value != self.cpu_version ): self.Delete() raise errors.Resource.RetryableCreationError( f'Guest arch {self.cpu_version} is not enforced guest arch' f' {_REQUIRED_CPU_VERSION.value}. Deleting VM and scratch disk and' ' recreating.', ) try: vm_util.Retry( max_retries=_REQUIRED_CPU_VERSION_RETRIES.value, retryable_exceptions=errors.Resource.RetryableCreationError, )(CreateAndBootOnce)() except vm_util.RetriesExceededRetryError as exc: raise errors.Benchmarks.InsufficientCapacityCloudFailure( f'{_REQUIRED_CPU_VERSION.value} was not obtained after' f' {_REQUIRED_CPU_VERSION_RETRIES.value} retries.' ) from exc def PrepareAfterBoot(self): """Prepares a VM after it has booted. This function will prepare a scratch disk if required. Raises: Exception: If --vm_metadata is malformed. """ self.AddMetadata() self.OnStartup() self.SetupAllScratchDisks() self.PrepareVMEnvironment() self.RecordAdditionalMetadata() def AllowIcmp(self): """Opens ICMP protocol on the firewall corresponding to the VM if exists.""" if self.firewall and not FLAGS.skip_firewall_rules: self.firewall.AllowIcmp(self) def AllowPort(self, start_port, end_port=None, source_range=None): """Opens the port on the firewall corresponding to the VM if one exists. Args: start_port: The first local port to open in a range. end_port: The last local port to open in a range. If None, only start_port will be opened. source_range: list of CIDRs. If none, all sources are allowed. """ if self.firewall and not FLAGS.skip_firewall_rules: self.firewall.AllowPort(self, start_port, end_port, source_range) def AllowRemoteAccessPorts(self): """Allow all ports in self.remote_access_ports.""" for port in self.remote_access_ports: self.AllowPort(port) def AddMetadata(self, **kwargs): """Add key/value metadata to the instance. Setting the metadata on create is preferred. If that is not possible, this method adds metadata in the form of key value pairs to the instance. Useful for debugging / introspection. The default implementation is a noop. Cloud providers supporting instance metadata should override. Args: **kwargs: dict. (tag name, tag value) pairs to set as metadata on the instance. """ pass def GetResourceMetadata(self): """Returns a dict containing VM metadata. Returns: dict mapping string property key to value. """ if not self.created: return {} result = self.metadata.copy() result.update({ 'image': self.image, 'zone': self.zone, 'cloud': self.CLOUD, 'os_type': type(self).OS_TYPE, }) if self.cidr is not None: result['cidr'] = self.cidr if self.machine_type is not None: result['machine_type'] = self.machine_type if self.use_dedicated_host is not None: result['dedicated_host'] = self.use_dedicated_host if self.num_vms_per_host is not None: result['num_vms_per_host'] = self.num_vms_per_host if self.tcp_congestion_control is not None: result['tcp_congestion_control'] = self.tcp_congestion_control if self.numa_node_count is not None: result['numa_node_count'] = self.numa_node_count if self.num_disable_cpus is not None: result['num_disable_cpus'] = self.num_disable_cpus if self.num_cpus is not None: result['num_cpus'] = self.num_cpus if ( self.num_cpus is not None and self.NumCpusForBenchmark() != self.num_cpus ): result['num_benchmark_cpus'] = self.NumCpusForBenchmark() # Some metadata is unique per VM. # Update publisher._VM_METADATA_TO_LIST to add more if self.id is not None: result['id'] = self.id if self.name is not None: result['name'] = self.name if self.ip_address is not None: result['ip_address'] = self.ip_address if pkb_flags.RECORD_PROCCPU.value and self.cpu_version: result['cpu_version'] = self.cpu_version if self.create_operation_name is not None: result['create_operation_name'] = self.create_operation_name if self.create_start_time: result['create_start_time'] = self.create_start_time return result def SimulateMaintenanceEvent(self): """Simulates a maintenance event on the VM.""" raise NotImplementedError() def SetupLMNotification(self): """Prepare environment for /scripts/gce_maintenance_notify.py script.""" raise NotImplementedError() def _GetLMNotifificationCommand(self): """Return Remote python execution command for LM notify script.""" raise NotImplementedError() def StartLMNotification(self): """Start meta-data server notification subscription.""" raise NotImplementedError() def WaitLMNotificationRelease(self): """Block main thread until LM ended.""" raise NotImplementedError() def CollectLMNotificationsTime(self): """Extract LM notifications from log file.""" raise NotImplementedError() def _InstallData( self, preprovisioned_data: Dict[str, str], module_name: str, filenames: List[str], install_path: str, fallback_url: Dict[str, str], timeout=PREPROVISIONED_DATA_TIMEOUT, ): """Installs preprovisioned_data on this VM. Args: preprovisioned_data: The dict mapping filenames to sha256sum hashes. module_name: The name of the module defining the preprovisioned data. filenames: An iterable of preprovisioned data filenames for a particular module. install_path: The path to download the data file. fallback_url: The dict mapping filenames to fallback url for downloading. timeout: The timeout for downloading the data file. Raises: errors.Setup.BadPreprovisionedDataError: If the module or filename are not defined with preprovisioned data, or if the sha256sum hash in the code does not match the sha256 of the file. """ for filename in filenames: if data.ResourceExists(filename): local_tar_file_path = data.ResourcePath(filename) self.PushFile(local_tar_file_path, install_path) continue url = fallback_url.get(filename) sha256sum = preprovisioned_data.get(filename) chunked = False try: # For extremely large files, we chunk into multiple files of 5GB each. # Named with [original_filename]_[index]].part, index starting at 000. # On Linux, run: # split -b 5G [filename] -d -a 3 --additional-suffix=.part filename_ # Recovery is done during checksum verification. preprovisioned = self.ShouldDownloadPreprovisionedData( module_name, filename ) if not preprovisioned: preprovisioned = self.ShouldDownloadPreprovisionedData( module_name, f'{filename}_000.part' ) if preprovisioned: chunked = True except NotImplementedError: logging.info( 'The provider does not implement ' 'ShouldDownloadPreprovisionedData. Attempting to ' 'download the data via URL' ) preprovisioned = False if not FLAGS.preprovision_ignore_checksum and not sha256sum: raise errors.Setup.BadPreprovisionedDataError( 'Cannot find sha256sum hash for file %s in module %s. Might want ' 'to run using --preprovision_ignore_checksum (not recommended). ' 'See README.md for information about preprovisioned data. ' 'Cannot find file in /data directory either, fail to upload from ' 'local directory.' % (filename, module_name) ) # TODO(yuyanting): Cleanup the logic, extract chunked download to a # separate function. if preprovisioned: if not chunked: self.DownloadPreprovisionedData( install_path, module_name, filename, timeout ) else: file_index = 0 while True: chunked_filename = f'{filename}_{file_index:03d}.part' # TODO(yuyanting): Use list + parallel download is probably faser. # But in case we do not have list permission. if not self.ShouldDownloadPreprovisionedData( module_name, chunked_filename ): break self.DownloadPreprovisionedData( install_path, module_name, chunked_filename, timeout ) file_index += 1 self.RecoverChunkedPreprovisionedData(install_path, filename) elif url: self.Install('wget') file_name = os.path.basename(url) self.RemoteCommand( 'wget -O {} {}'.format(os.path.join(install_path, file_name), url) ) else: raise errors.Setup.BadPreprovisionedDataError( 'Cannot find preprovisioned file %s inside preprovisioned bucket ' 'in module %s. See README.md for information about ' 'preprovisioned data. ' 'Cannot find fallback url of the file to download from web. ' 'Cannot find file in /data directory either, fail to upload from ' 'local directory.' % (filename, module_name) ) if not FLAGS.preprovision_ignore_checksum: self.CheckPreprovisionedData( install_path, module_name, filename, sha256sum ) def InstallPreprovisionedBenchmarkData( self, benchmark_name, filenames, install_path, timeout=PREPROVISIONED_DATA_TIMEOUT, ): """Installs preprovisioned benchmark data on this VM. Some benchmarks require importing many bytes of data into the virtual machine. This data can be staged in a particular cloud and the virtual machine implementation can override how the preprovisioned data is installed in the VM by overriding DownloadPreprovisionedData. For example, for GCP VMs, benchmark data can be preprovisioned in a GCS bucket that the VMs may access. For a benchmark that requires preprovisioned data, follow the instructions for that benchmark to download and store the data so that it may be accessed by a VM via this method. Before installing from preprovisioned data in the cloud, this function looks for files in the local data directory. If found, they are pushed to the VM. Otherwise, this function attempts to download them from their preprovisioned location onto the VM. Args: benchmark_name: The name of the benchmark defining the preprovisioned data. The benchmark's module must define the dict BENCHMARK_DATA mapping filenames to sha256sum hashes. filenames: An iterable of preprovisioned data filenames for a particular benchmark. install_path: The path to download the data file. timeout: The timeout for downloading the data file. Raises: errors.Setup.BadPreprovisionedDataError: If the benchmark or filename are not defined with preprovisioned data, or if the sha256sum hash in the code does not match the sha256sum of the file. """ benchmark_module = benchmark_lookup.BenchmarkModule(benchmark_name) if not benchmark_module: raise errors.Setup.BadPreprovisionedDataError( 'Cannot install preprovisioned data for undefined benchmark %s.' % benchmark_name ) try: # TODO(user): Change BENCHMARK_DATA to PREPROVISIONED_DATA. preprovisioned_data = benchmark_module.BENCHMARK_DATA except AttributeError: raise errors.Setup.BadPreprovisionedDataError( 'Benchmark %s does not define a BENCHMARK_DATA dict with ' 'preprovisioned data.' % benchmark_name ) fallback_url = getattr(benchmark_module, 'BENCHMARK_DATA_URL', {}) self._InstallData( preprovisioned_data, benchmark_name, filenames, install_path, fallback_url, timeout, ) def InstallPreprovisionedPackageData( self, package_name, filenames, install_path, timeout=PREPROVISIONED_DATA_TIMEOUT, ): """Installs preprovisioned Package data on this VM. Some benchmarks require importing many bytes of data into the virtual machine. This data can be staged in a particular cloud and the virtual machine implementation can override how the preprovisioned data is installed in the VM by overriding DownloadPreprovisionedData. For example, for GCP VMs, benchmark data can be preprovisioned in a GCS bucket that the VMs may access. For a benchmark that requires preprovisioned data, follow the instructions for that benchmark to download and store the data so that it may be accessed by a VM via this method. Before installing from preprovisioned data in the cloud, this function looks for files in the local data directory. If found, they are pushed to the VM. Otherwise, this function attempts to download them from their preprovisioned location onto the VM. Args: package_name: The name of the package file defining the preprovisoned data. The default value is None. If the package_name is provided, the package file must define the dict PREPROVISIONED_DATA mapping filenames to sha256sum hashes. filenames: An iterable of preprovisioned data filenames for a particular package. install_path: The path to download the data file. timeout: The timeout for downloading the data file. Raises: errors.Setup.BadPreprovisionedDataError: If the package or filename are not defined with preprovisioned data, or if the sha256sum hash in the code does not match the sha256sum of the file. """ package_module = package_lookup.PackageModule(package_name) if not package_module: raise errors.Setup.BadPreprovisionedDataError( 'Cannot install preprovisioned data for undefined package %s.' % package_name ) try: preprovisioned_data = package_module.PREPROVISIONED_DATA except AttributeError: raise errors.Setup.BadPreprovisionedDataError( 'Package %s does not define a PREPROVISIONED_DATA dict with ' 'preprovisioned data.' % package_name ) fallback_url = getattr(package_module, 'PACKAGE_DATA_URL', {}) self._InstallData( preprovisioned_data, package_name, filenames, install_path, fallback_url, timeout, ) def ShouldDownloadPreprovisionedData(self, module_name, filename): """Returns whether or not preprovisioned data is available. This function should be overridden by each cloud provider VM. Args: module_name: Name of the module associated with this data file. filename: The name of the file that was downloaded. Returns: A boolean indicates if preprovisioned data is available. """ raise NotImplementedError() def InstallCli(self): """Installs the cloud specific cli along with credentials on this vm.""" raise NotImplementedError() def DownloadPreprovisionedData( self, install_path: str, module_name: str, filename: str, timeout: int = PREPROVISIONED_DATA_TIMEOUT, ): """Downloads preprovisioned benchmark data. This function should be overridden by each cloud provider VM. The file should be downloaded into the install path within a subdirectory with the benchmark name. The downloaded file's parent directory will be created if it does not exist. Args: install_path: The install path on this VM. module_name: Name of the module associated with this data file. filename: The name of the file that was downloaded. timeout: Timeout value for downloading preprovisioned data, default 5 min. """ raise NotImplementedError() def IsInterruptible(self): """Returns whether this vm is a interruptible vm (e.g. spot, preemptible). Caller must call UpdateInterruptibleVmStatus before calling this function to make sure return value is up to date. Returns: True if this vm is a interruptible vm. """ return False def _UpdateInterruptibleVmStatusThroughMetadataService(self): raise NotImplementedError() def _UpdateInterruptibleVmStatusThroughApi(self): # Azure do not support detecting through api pass def UpdateInterruptibleVmStatus(self, use_api=False): """Updates the status of the discounted vm. Args: use_api: boolean, If use_api is false, method will attempt to query metadata service to check vm preemption. If use_api is true, method will attempt to use API to detect vm preemption query if metadata service detecting fails. """ if not self.IsInterruptible(): return if self.WasInterrupted(): return try: self._UpdateInterruptibleVmStatusThroughMetadataService() except (NotImplementedError, errors.VirtualMachine.RemoteCommandError): self._UpdateInterruptibleVmStatusThroughApi() def WasInterrupted(self): """Returns whether this interruptible vm was terminated early. Caller must call UpdateInterruptibleVmStatus before calling this function to make sure return value is up to date. Returns: True if this vm is a interruptible vm was terminated early. """ return False def GetVmStatusCode(self): """Returns the vm status code if any. Caller must call UpdateInterruptibleVmStatus before calling this function to make sure return value is up to date. Returns: Vm status code. """ return None def GetInterruptableStatusPollSeconds(self): """Get seconds between interruptable status polls. Returns: Seconds between polls """ return 5 def _PreDelete(self): """See base class.""" self.LogVmDebugInfo() def UpdateTimeoutMetadata(self): """Updates the timeout tag for the VM.""" def GetNumTeardownSkippedVms(self) -> int | None: """Returns the number of lingering VMs in this VM's zone.""" VirtualMachine = typing.TypeVar('VirtualMachine', bound=BaseVirtualMachine)