perfkitbenchmarker/providers/gcp/gce_disk.py (527 lines of code) (raw):

# Copyright 2014 PerfKitBenchmarker Authors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Module containing classes related to GCE disks. Disks can be created, deleted, attached to VMs, and detached from VMs. Use 'gcloud compute disk-types list' to determine valid disk types. """ import json import logging import time from typing import Any from absl import flags import dateutil.parser from perfkitbenchmarker import background_tasks from perfkitbenchmarker import boot_disk from perfkitbenchmarker import disk from perfkitbenchmarker import errors from perfkitbenchmarker import provider_info from perfkitbenchmarker import resource from perfkitbenchmarker import vm_util from perfkitbenchmarker.configs import option_decoders from perfkitbenchmarker.providers.gcp import flags as gcp_flags from perfkitbenchmarker.providers.gcp import util import requests from requests import adapters FLAGS = flags.FLAGS PD_STANDARD = 'pd-standard' PD_SSD = 'pd-ssd' PD_BALANCED = 'pd-balanced' PD_EXTREME = 'pd-extreme' HYPERDISK_THROUGHPUT = 'hyperdisk-throughput' HYPERDISK_EXTREME = 'hyperdisk-extreme' HYPERDISK_BALANCED = 'hyperdisk-balanced' HYPERDISK_BALANCED_HA = 'hyperdisk-balanced-high-availability' GCE_REMOTE_DISK_TYPES = [ PD_STANDARD, PD_SSD, PD_BALANCED, PD_EXTREME, HYPERDISK_THROUGHPUT, HYPERDISK_EXTREME, HYPERDISK_BALANCED, HYPERDISK_BALANCED_HA, ] # Defaults picked to align with console. GCE_DYNAMIC_IOPS_DISK_TYPE_DEFAULTS = { PD_EXTREME: 100000, HYPERDISK_EXTREME: 100000, HYPERDISK_BALANCED: 3600, HYPERDISK_BALANCED_HA: 3600, } # Defaults picked to align with console. GCE_DYNAMIC_THROUGHPUT_DISK_TYPE_DEFAULTS = { HYPERDISK_BALANCED: 290, HYPERDISK_THROUGHPUT: 180, HYPERDISK_BALANCED_HA: 290, } REGIONAL_DISK_SCOPE = 'regional' DISK_METADATA = { PD_STANDARD: { disk.MEDIA: disk.HDD, disk.REPLICATION: disk.ZONE, }, PD_BALANCED: { disk.MEDIA: disk.SSD, disk.REPLICATION: disk.ZONE, }, PD_SSD: { disk.MEDIA: disk.SSD, disk.REPLICATION: disk.ZONE, }, PD_EXTREME: { disk.MEDIA: disk.SSD, disk.REPLICATION: disk.ZONE, }, HYPERDISK_THROUGHPUT: { disk.MEDIA: disk.HDD, disk.REPLICATION: disk.ZONE, }, HYPERDISK_EXTREME: { disk.MEDIA: disk.SSD, disk.REPLICATION: disk.ZONE, }, HYPERDISK_BALANCED: { disk.MEDIA: disk.SSD, disk.REPLICATION: disk.ZONE, }, HYPERDISK_BALANCED_HA: { disk.MEDIA: disk.SSD, disk.REPLICATION: disk.REGION, }, disk.LOCAL: { disk.MEDIA: disk.SSD, disk.REPLICATION: disk.NONE, }, } SCSI = 'SCSI' NVME = 'NVME' # Latest GCE families have a preset # number of SSDs, so we set those attributes directly from the machine type. FIXED_SSD_MACHINE_TYPES = { 'z3-highmem-176': 12, 'z3-highmem-88': 12, 'c3-standard-4-lssd': 1, 'c3-standard-8-lssd': 2, 'c3-standard-22-lssd': 4, 'c3-standard-44-lssd': 8, 'c3-standard-88-lssd': 16, 'c3d-standard-8-lssd': 1, 'c3d-standard-16-lssd': 1, 'c3d-standard-30-lssd': 2, 'c3d-standard-60-lssd': 4, 'c3d-standard-90-lssd': 8, 'c3d-standard-180-lssd': 16, 'c3d-standard-360-lssd': 32, 'c3d-highmem-8-lssd': 1, 'c3d-highmem-16-lssd': 1, 'c3d-highmem-30-lssd': 2, 'c3d-highmem-60-lssd': 4, 'c3d-highmem-90-lssd': 8, 'c3d-highmem-180-lssd': 16, 'c3d-highmem-360-lssd': 32, } NVME_PD_MACHINE_FAMILIES = ['m3'] class GceServiceUnavailableError(Exception): """Error for retrying _Attach when the describe output indicates that 'The service is currently unavailable'.""" def PdDriveIsNvme(vm): """Check if the machine uses NVMe for PD.""" machine_type = vm.machine_type family = machine_type.split('-')[0].lower() if family in NVME_PD_MACHINE_FAMILIES: return True # In other cases, only a sub-category of a family uses nvme, # such as confidential VMs on Milan. # this is not robust, but can get refactored when # there is more clarity on what groups of VMs are NVMe. if gcp_flags.GCE_CONFIDENTIAL_COMPUTE.value: return True return False # Add labels fails sometimes with a timeout - consider moving to Create(). @vm_util.Retry( max_retries=3, retryable_exceptions=( errors.VmUtil.IssueCommandTimeoutError, ), ) def AddLabels(gcp_resource: resource.BaseResource, disk_name: str): """Add labels to a disk created by a service that fails to label a disk. Disks created by PKB (and most services) are labeled at create time. Args: gcp_resource: a resource with the project and zone of the disk. disk_name: the name of the disk """ cmd = util.GcloudCommand( gcp_resource, 'compute', 'disks', 'add-labels', disk_name ) cmd.flags['labels'] = util.MakeFormattedDefaultTags() cmd.Issue() class GceDiskSpec(disk.BaseDiskSpec): """Object holding the information needed to create a GCPDisk.""" interface: str num_local_ssds: int create_with_vm: bool replica_zones: list[str] CLOUD = provider_info.GCP @classmethod def _ApplyFlags(cls, config_values, flag_values): """Overrides config values with flag values. Can be overridden by derived classes to add support for specific flags. Args: config_values: dict mapping config option names to provided values. Is modified by this function. flag_values: flags.FlagValues. Runtime flags that may override the provided config values. Returns: dict mapping config option names to values derived from the config values or flag values. """ super()._ApplyFlags(config_values, flag_values) if flag_values['gce_ssd_interface'].present: config_values['interface'] = flag_values.gce_ssd_interface if flag_values['gce_num_local_ssds'].present: config_values['num_local_ssds'] = flag_values.gce_num_local_ssds if flag_values['gcp_create_disks_with_vm'].present: config_values['create_with_vm'] = flag_values.gcp_create_disks_with_vm if flag_values['data_disk_zones'].present: config_values['replica_zones'] = flag_values.data_disk_zones @classmethod def _GetOptionDecoderConstructions(cls): """Gets decoder classes and constructor args for each configurable option. Can be overridden by derived classes to add options or impose additional requirements on existing options. Returns: dict. Maps option name string to a (ConfigOptionDecoder class, dict) pair. The pair specifies a decoder class and its __init__() keyword arguments to construct in order to decode the named option. """ result = super()._GetOptionDecoderConstructions() result.update({ 'interface': (option_decoders.StringDecoder, {'default': 'NVME'}), 'num_local_ssds': ( option_decoders.IntDecoder, {'default': 0, 'min': 0}, ), 'create_with_vm': ( option_decoders.BooleanDecoder, {'default': True}, ), 'replica_zones': ( option_decoders.ListDecoder, { 'item_decoder': option_decoders.StringDecoder(), 'default': None, }, ), }) return result class GceBootDisk(boot_disk.BootDisk): """Object representing a GCE Boot Disk.""" def __init__(self, vm, boot_disk_spec): """Initialize a Boot Diks.""" super().__init__(boot_disk_spec) self.name = vm.name self.image = None self.vm = vm self.boot_disk_size = boot_disk_spec.boot_disk_size self.boot_disk_type = boot_disk_spec.boot_disk_type self.boot_disk_iops = None self.boot_disk_throughput = None if self.boot_disk_type in GCE_DYNAMIC_IOPS_DISK_TYPE_DEFAULTS.keys(): self.boot_disk_iops = boot_disk_spec.boot_disk_iops if self.boot_disk_type in GCE_DYNAMIC_THROUGHPUT_DISK_TYPE_DEFAULTS.keys(): self.boot_disk_throughput = boot_disk_spec.boot_disk_throughput def GetCreationCommand(self): dic = {'boot-disk-auto-delete': True} if self.boot_disk_size: dic['boot-disk-size'] = self.boot_disk_size if self.boot_disk_type: dic['boot-disk-type'] = self.boot_disk_type if self.boot_disk_iops: dic['boot-disk-provisioned-iops'] = self.boot_disk_iops if self.boot_disk_throughput: dic['boot-disk-provisioned-throughput'] = self.boot_disk_throughput return dic @vm_util.Retry() def PostCreate(self): getdisk_cmd = util.GcloudCommand( self.vm, 'compute', 'disks', 'describe', self.name ) stdout, _, _ = getdisk_cmd.Issue() response = json.loads(stdout) # Updates all the attrubutes to make sure it's up to date self.image = response['sourceImage'].split('/')[-1] self.boot_disk_size = response['sizeGb'] self.boot_disk_type = response['type'].split('/')[-1] def GetResourceMetadata(self): result = super().GetResourceMetadata() result['boot_disk_type'] = self.boot_disk_type result['boot_disk_size'] = self.boot_disk_size if self.boot_disk_iops: result['boot_disk_provisioned_iops'] = self.boot_disk_iops if self.boot_disk_throughput: result['boot_disk_provisioned_throughput'] = self.boot_disk_throughput return result class GceLocalDisk(disk.BaseDisk): """Object representing a GCE Local Disk.""" def __init__(self, disk_spec, name): super().__init__(disk_spec) self.interface = disk_spec.interface self.metadata['interface'] = disk_spec.interface self.metadata.update(DISK_METADATA[disk_spec.disk_type]) self.name = name def GetDevicePath(self) -> str: return f'/dev/disk/by-id/google-{self.name}' def IsNvme(self): return self.interface == NVME class GceDisk(disk.BaseDisk): """Object representing a GCE Disk.""" def __init__( self, disk_spec, name, zone, project, image=None, image_project=None, replica_zones=None, ): super().__init__(disk_spec) self.spec = disk_spec self.interface = disk_spec.interface self.attached_vm_name = None self.image = image self.image_project = image_project self.name = name self.zone = zone self.project = project self.replica_zones = replica_zones self.region = util.GetRegionFromZone(self.zone) self.provisioned_iops = None self.provisioned_throughput = None if self.disk_type in GCE_DYNAMIC_IOPS_DISK_TYPE_DEFAULTS.keys(): self.provisioned_iops = disk_spec.provisioned_iops if not self.provisioned_iops: self.provisioned_iops = GCE_DYNAMIC_IOPS_DISK_TYPE_DEFAULTS[ self.disk_type ] self.metadata['iops'] = self.provisioned_iops if self.disk_type in GCE_DYNAMIC_THROUGHPUT_DISK_TYPE_DEFAULTS.keys(): self.provisioned_throughput = disk_spec.provisioned_throughput if not self.provisioned_throughput: self.provisioned_throughput = GCE_DYNAMIC_THROUGHPUT_DISK_TYPE_DEFAULTS[ self.disk_type ] self.metadata['throughput'] = self.provisioned_throughput disk_metadata = DISK_METADATA[disk_spec.disk_type] if self.replica_zones: disk_metadata[disk.REPLICATION] = disk.REGION self.metadata['replica_zones'] = replica_zones self.metadata.update(DISK_METADATA[disk_spec.disk_type]) def _Create(self): """Creates the disk.""" cmd = util.GcloudCommand(self, 'compute', 'disks', 'create', self.name) cmd.flags['size'] = self.disk_size cmd.flags['type'] = self.disk_type if self.provisioned_iops: cmd.flags['provisioned-iops'] = self.provisioned_iops if self.provisioned_throughput: cmd.flags['provisioned-throughput'] = self.provisioned_throughput cmd.flags['labels'] = util.MakeFormattedDefaultTags() if self.image: cmd.flags['image'] = self.image if self.image_project: cmd.flags['image-project'] = self.image_project if self.replica_zones: cmd.flags['region'] = self.region cmd.flags['replica-zones'] = ','.join(self.replica_zones) del cmd.flags['zone'] if self.multi_writer_disk: cmd.flags['access-mode'] = 'READ_WRITE_MANY' if self.spec.snapshot_name: cmd.flags['source-snapshot'] = self.spec.snapshot_name self.create_disk_start_time = time.time() stdout, stderr, retcode = cmd.Issue(raise_on_failure=False) util.CheckGcloudResponseKnownFailures(stderr, retcode) self.create_disk_end_time = self._GetEndTime(stdout) def _Delete(self): """Deletes the disk, as well as all associated snapshot and restore disks.""" if self.snapshots: for snapshot in self.snapshots: snapshot.Delete() cmd = util.GcloudCommand(self, 'compute', 'disks', 'delete', self.name) if self.replica_zones: cmd.flags['region'] = self.region del cmd.flags['zone'] cmd.Issue(raise_on_failure=False) def _Describe(self): """Returns json describing the disk or None on failure.""" cmd = util.GcloudCommand(self, 'compute', 'disks', 'describe', self.name) if self.replica_zones: cmd.flags['region'] = self.region del cmd.flags['zone'] stdout, _, _ = cmd.Issue(raise_on_failure=False) try: result = json.loads(stdout) except ValueError: return None return result def _IsReady(self): """Returns true if the disk is ready.""" result = self._Describe() if not result: return False return result.get('status') == 'READY' def _Exists(self): """Returns true if the disk exists.""" result = self._Describe() return bool(result) def Exists(self): return self._Exists() def _GetEndTime(self, cmd_issue_response: str): """Returns the end time of the attach operation.""" end_time = time.time() return end_time @vm_util.Retry( poll_interval=30, max_retries=10, retryable_exceptions=( GceServiceUnavailableError, errors.VmUtil.IssueCommandTimeoutError, ), ) def _Attach(self, vm): """Attaches the disk to a VM. Args: vm: The GceVirtualMachine instance to which the disk will be attached. Raises: GceServiceUnavailableError: when the service is not available """ self.attached_vm_name = vm.name cmd = util.GcloudCommand( self, 'compute', 'instances', 'attach-disk', self.attached_vm_name ) cmd.flags['device-name'] = self.name cmd.flags['disk'] = self.name if self.replica_zones: cmd.flags['disk-scope'] = REGIONAL_DISK_SCOPE cmd.flags['region'] = self.region self.attach_start_time = time.time() stdout, stderr, retcode = cmd.Issue(raise_on_failure=False) self.attach_end_time = self._GetEndTime(stdout) # Gcloud attach-disk commands may still attach disks despite being rate # limited. if retcode: if ( 'The service is currently unavailable' in stderr or 'Please try again in 30 seconds.' in stderr ): logging.info('disk attach command failed, retrying.') raise GceServiceUnavailableError() if ( cmd.rate_limited and 'is already being used' in stderr and FLAGS.retry_on_rate_limited ): return debug_text = 'Ran: {%s}\nReturnCode:%s\nSTDOUT: %s\nSTDERR: %s' % ( ' '.join(cmd.GetCommand()), retcode, stdout, stderr, ) raise errors.VmUtil.CalledProcessException( 'Command returned a non-zero exit code:\n{}'.format(debug_text) ) def GetCreateFlags(self): name = self.name pd_args = [ f'name={name}', f'device-name={name}', f'size={self.spec.disk_size}', f'type={self.spec.disk_type}', 'auto-delete=yes', 'boot=no', 'mode=rw', ] if self.provisioned_iops: pd_args += [f'provisioned-iops={self.provisioned_iops}'] if self.provisioned_throughput: pd_args += [f'provisioned-throughput={self.provisioned_throughput}'] return ','.join(pd_args) def _Detach(self): """Detaches the disk from a VM.""" cmd = util.GcloudCommand( self, 'compute', 'instances', 'detach-disk', self.attached_vm_name ) cmd.flags['device-name'] = self.name if self.replica_zones: cmd.flags['disk-scope'] = REGIONAL_DISK_SCOPE stdout, _ = cmd.IssueRetryable() self.attached_vm_name = None self.detach_end_time = self._GetEndTime(stdout) def GetDevicePath(self): """Returns the path to the device inside the VM.""" if self.interface == NVME: return self.name # by default, returns this name id. return f'/dev/disk/by-id/google-{self.name}' def IsNvme(self): return self.interface == NVME def CreateSnapshot(self): """Creates a snapshot of the disk.""" self.snapshots.append(GceDiskSnapshot(self)) self.snapshots[-1].Create() class GceDiskSnapshot(disk.DiskSnapshot): """Object representing a GCE Disk Snapshot. Attributes: source_disk: The GceDisk object that the snapshot is created from. disk_spec: The disk spec of the source disk. source_disk_name: The name of the source disk. source_disk_size: The size of the source disk. source_disk_type: The type of the source disk. name: The name of the snapshot. zone: The zone of the source disk. region: The region of the source disk. project: The project of the source disk. storage_gb: The storage used by the snapshot in GB. restore_disks: A list of GceDisk objects created from the snapshot. num_restore_disks: The number of disks restored from this snapshot. """ def __init__(self, source_disk): super().__init__() self.source_disk = source_disk self.disk_spec = source_disk.spec self.source_disk_name = source_disk.name self.source_disk_size = source_disk.disk_size self.source_disk_type = source_disk.disk_type self.name = f'disk-snapshot-{self.source_disk_name}' self.zone = source_disk.zone self.region = util.GetRegionFromZone(self.zone) self.project = source_disk.project def _Create(self): """Creates a snapshot of the disk. Raises: errors.VmUtil.CalledProcessException: When the command returns a non-zero exit code. """ cmd = util.GcloudCommand(self, 'compute', 'snapshots', 'create', self.name) del cmd.flags['zone'] cmd.flags['source-disk'] = self.source_disk_name cmd.flags['source-disk-zone'] = self.zone cmd.flags['storage-location'] = self.region cmd.flags['labels'] = util.MakeFormattedDefaultTags() self.creation_start_time = time.time() stdout, stderr, retcode = cmd.Issue(raise_on_failure=False) self.creation_end_time = self._Describe() if retcode: debug_text = 'Ran: {%s}\nReturnCode:%s\nSTDOUT: %s\nSTDERR: %s' % ( ' '.join(cmd.GetCommand()), retcode, stdout, stderr, ) raise errors.VmUtil.CalledProcessException( 'Command returned a non-zero exit code:\n{}'.format(debug_text) ) @vm_util.Retry( poll_interval=0.5, max_retries=10, log_errors=True, retryable_exceptions=(errors.Resource.RetryableCreationError,), ) def _Describe(self): """Describe the snapshot, storing the storage size in GB. Returns: float: The time when the snapshot was created. Raises: errors.Resource.RetryableCreationError: If the snapshot is not created and ready. """ cmd = util.GcloudCommand( self, 'compute', 'snapshots', 'describe', self.name ) del cmd.flags['zone'] stdout, _, _ = cmd.Issue(raise_on_failure=False) result = json.loads(stdout) self.storage_gb = int(result['storageBytes']) // (2**30) snapshot_status = result['status'] if snapshot_status == 'READY': return time.time() logging.info( 'Disk %s snapshot %s has status %s.', self.source_disk_name, self.name, snapshot_status, ) raise errors.Resource.RetryableCreationError() def Restore(self): """Creates a disk from the snapshot.""" self.restore_disk_name = f'{self.name}-restore-{self.num_restore_disks}' self.disk_spec.snapshot_name = self.name restore_disk = GceDisk( self.disk_spec, self.restore_disk_name, self.zone, self.project ) restore_disk.Create() self.restore_disks.append(restore_disk) self.num_restore_disks += 1 return self.restore_disks[-1] def Delete(self): """Deletes the snapshot.""" if self.restore_disks: for restore_disk in self.restore_disks: restore_disk.Delete() self.num_restore_disks -= 1 cmd = util.GcloudCommand(self, 'compute', 'snapshots', 'delete', self.name) cmd.Issue(raise_on_failure=False) class GceStripedDisk(disk.StripedDisk): """Object representing multiple GCP disks striped together.""" def __init__(self, disk_spec, disks): super().__init__(disk_spec, disks) if len(disks) <= 1: raise ValueError( f'{len(disks)} disks found for GceStripedDisk' ) self.disks = disks self.spec = disk_spec self.interface = disk_spec.interface data_disk = disks[0] self.attached_vm_name = None self.image = data_disk.image self.image_project = data_disk.image_project self.zone = data_disk.zone self.project = data_disk.project self.replica_zones = data_disk.replica_zones self.region = util.GetRegionFromZone(self.zone) self.provisioned_iops = data_disk.provisioned_iops self.provisioned_throughput = data_disk.provisioned_throughput self.metadata['iops'] = self.provisioned_iops self.metadata['throughput'] = self.provisioned_throughput def _Create(self): """Creates the disk.""" create_tasks = [] for disk_details in self.disks: create_tasks.append((disk_details.Create, (), {})) background_tasks.RunParallelThreads(create_tasks, max_concurrency=200) def _PostCreate(self): """Called after _CreateResource() is called.""" for disk_details in self.disks: disk_details.created = True def _GetDiskNames(self): return [d.name for d in self.disks] def _Attach(self, vm): attach_tasks = [] for disk_details in self.disks: attach_tasks.append((disk_details.Attach, [vm], {})) background_tasks.RunParallelThreads(attach_tasks, max_concurrency=200) def _Exists(self): for disk_details in self.disks: if not disk_details.Exists(): return False return True def _Detach(self): detach_tasks = [] for disk_details in self.disks: detach_tasks.append((disk_details.Detach, (), {})) background_tasks.RunParallelThreads(detach_tasks, max_concurrency=200)