perfkitbenchmarker/disk_strategies.py (358 lines of code) (raw):

# Copyright 2023 PerfKitBenchmarker Authors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Module containing strategies to prepare disks. This module abstract out the disk strategies of a virtual machine so code can be resuse and abstracted across clouds. There are two types of diks strategies. 1. DiskCreationStrategy - This strategy governs how a virtual machine should create the disk resource. 2. SetUpDiskStrategy - This strategy controls how a disk are set up. """ import copy import json import logging import time from typing import Any, Union from absl import flags from perfkitbenchmarker import background_tasks from perfkitbenchmarker import context as pkb_context from perfkitbenchmarker import disk from perfkitbenchmarker import nfs_service from perfkitbenchmarker import os_types from perfkitbenchmarker import vm_util FLAGS = flags.FLAGS virtual_machine = Any # virtual_machine.py imports this module. class CreateDiskStrategy: """Handles disk related logic to create GCE Resource disk. This strategy class handles creation of a disk. Attributes: vm: The virutal machine. disk_spec: Spec of the disk to be created. disk_count: The number of disk. disk_specs: Duplicated disk_spec by disk_count """ def __init__( self, vm: 'virtual_machine. BaseVirtualMachine', disk_spec: disk.BaseDiskSpec, disk_count: int, ): self.vm = vm self.disk_spec = disk_spec self.disk_count = disk_count if disk_spec.disk_type == disk.LOCAL and disk_count is None: disk_count = self.vm.max_local_disks self.disk_specs = [copy.copy(disk_spec) for _ in range(disk_count)] # In the event that we need to create multiple disks from the same # DiskSpec, we need to ensure that they have different mount points. if disk_count > 1 and disk_spec.mount_point: for i, vm_disk_spec in enumerate(self.disk_specs): vm_disk_spec.mount_point += str(i) def AddMetadataToDiskResource(self): """Add metadata to the disk resource for tagging purpose.""" pass def GetCreationCommand(self) -> dict[str, Any]: """Returns the command to create the disk resource with the VM.""" return {} def CreateAndAttachDisk(self) -> None: """Calls Create and attach disk if needed.""" if not self.DiskCreatedOnVMCreation(): return self._CreateAndAttachDisk() def _CreateAndAttachDisk(self) -> None: """Creates and attaches the disk resource to the VM.""" raise NotImplementedError() def GetResourceMetadata(self): """Returns the metadata of the disk resource.""" return {} def DiskCreatedOnVMCreation(self) -> bool: """Returns whether the disk is created on the VM.""" raise NotImplementedError() def GetSetupDiskStrategy(self) -> 'SetUpDiskStrategy': """Returns the SetUpDiskStrategy for the disk.""" return EmptySetupDiskStrategy(self.vm, self.disk_spec) class EmptyCreateDiskStrategy(CreateDiskStrategy): """Strategies to create nothing. Useful when there is no resource disk.""" # pylint: disable=super-init-not-called def __init__(self, vm: Any, disk_spec: disk.BaseDiskSpec, disk_count: int): self.disk_spec = disk_spec self.disk_count = disk_count self.vm = vm def _CreateAndAttachDisk(self) -> None: """Does nothing.""" return def DiskCreatedOnVMCreation(self) -> bool: return True class DisksAreNotVisibleError(Exception): pass class DisksAreNotDetachedError(Exception): pass def GetNonBootDiskCount(vm) -> int: """Returns the number of non boot disks attached to the VM.""" stdout, stderr = vm.RemoteCommand('lsblk --json', ignore_failure=True) if not stdout: logging.error(stderr) disks_json = json.loads(stdout) block_devices = disks_json['blockdevices'] # This logic won't work for VMs that come with local ssd because # the type is 'disk' for local ssd as well. block_devices_disks = [d for d in block_devices if d['type'] == 'disk'] non_boot_disks_attached = ( len(block_devices_disks) - 1 ) # subtracting boot disk return non_boot_disks_attached class SetUpDiskStrategy: """Strategies to set up ram disks.""" def __init__( self, vm: 'virtual_machine.BaseVirtualMachine', disk_spec: disk.BaseDiskSpec, ): self.vm = vm self.disk_spec = disk_spec self.scratch_disks = [] def SetUpDisk(self) -> None: if self.vm.OS_TYPE in os_types.LINUX_OS_TYPES: self.SetUpDiskOnLinux() else: self.SetUpDiskOnWindows() def SetUpDiskOnWindows(self): """Performs Windows specific setup of ram disk.""" raise NotImplementedError( f'{self.disk_spec.disk_type} is not supported on Windows.' ) def SetUpDiskOnLinux(self): """Performs Linux specific setup of ram disk.""" raise NotImplementedError( f'{self.disk_spec.disk_type} is not supported on linux.' ) def GetTotalDiskCount(self) -> int: total_disks = 0 for scratch_disk in self.scratch_disks: if isinstance(scratch_disk, disk.StripedDisk): total_disks += len(scratch_disk.disks) else: total_disks += 1 return total_disks def CheckDisksVisibility(self): """Checks for all the disks to be visible from the VM. Returns: True : if all the disks are visible to the VM """ non_boot_disks_attached = GetNonBootDiskCount(self.vm) if non_boot_disks_attached == self.GetTotalDiskCount(): return True else: return False @vm_util.Retry( poll_interval=1, timeout=200, max_retries=200, retryable_exceptions=(DisksAreNotVisibleError,), ) def WaitForDisksToVisibleFromVm(self, start_time) -> float: """Waits for the disks to be visible from the Guest. Args: start_time: time when the attach operation started. Returns: time taken for all the disks to be visible from the Guest. Raises: DisksAreNotVisibleError: if the disks are not visible. """ # not implemented for Windows if self.vm.OS_TYPE not in os_types.LINUX_OS_TYPES: return -1 self.CheckDisksVisibility() if not self.CheckDisksVisibility(): raise DisksAreNotVisibleError('Disks not visible') return time.time() - start_time class EmptySetupDiskStrategy(SetUpDiskStrategy): """Strategies to set up nothing. This is useful when there is no disk.""" def SetUpDisk(self) -> None: pass class SetUpRamDiskStrategy(SetUpDiskStrategy): """Strategies to set up ram disks.""" def SetUpDiskOnLinux(self): """Performs Linux specific setup of ram disk.""" scratch_disk = disk.BaseDisk(self.disk_spec) logging.info( 'Mounting and creating Ram Disk %s, %s', scratch_disk.mount_point, scratch_disk.disk_size, ) mnt_cmd = ( 'sudo mkdir -p {0};sudo mount -t tmpfs -o size={1}g tmpfs {0};' 'sudo chown -R $USER:$USER {0};' ).format(scratch_disk.mount_point, scratch_disk.disk_size) self.vm.RemoteHostCommand(mnt_cmd) self.vm.scratch_disks.append(disk.BaseDisk(self.disk_spec)) class SetUpNFSDiskStrategy(SetUpDiskStrategy): """Strategies to set up NFS disks.""" def __init__( self, vm, disk_spec: disk.BaseDiskSpec, unmanaged_nfs_service: nfs_service.BaseNfsService | None = None, ): super().__init__(vm, disk_spec) if unmanaged_nfs_service: self.nfs_service = unmanaged_nfs_service else: self.nfs_service = getattr( pkb_context.GetThreadBenchmarkSpec(), 'nfs_service' ) def SetUpDiskOnLinux(self): """Performs Linux specific setup of ram disk.""" self.vm.Install('nfs_utils') nfs_disk = self.nfs_service.CreateNfsDisk() self.vm.MountDisk( nfs_disk.GetDevicePath(), self.disk_spec.mount_point, self.disk_spec.disk_type, nfs_disk.mount_options, nfs_disk.fstab_options, ) self.vm.scratch_disks.append(nfs_disk) class SetUpSMBDiskStrategy(SetUpDiskStrategy): """Strategies to set up NFS disks.""" def SetUpDiskOnLinux(self): """Performs Linux specific setup of ram disk.""" smb_service = getattr(pkb_context.GetThreadBenchmarkSpec(), 'smb_service') smb_disk = smb_service.CreateSmbDisk() self.vm.MountDisk( smb_disk.GetDevicePath(), self.disk_spec.mount_point, self.disk_spec.disk_type, smb_disk.mount_options, smb_disk.fstab_options, ) self.vm.scratch_disks.append(smb_disk) class PrepareScratchDiskStrategy: """Strategies to prepare scratch disks.""" TEMPDB_DISK_LETTER = 'T' def PrepareScratchDisk( self, vm, scratch_disk: Union[disk.BaseDisk, disk.StripedDisk], disk_spec: disk.BaseDiskSpec, ) -> None: if vm.OS_TYPE in os_types.LINUX_OS_TYPES: self.PrepareLinuxScratchDisk(vm, scratch_disk, disk_spec) else: self.PrepareWindowsScratchDisk(vm, scratch_disk, disk_spec) def PrepareLinuxScratchDisk( self, vm, scratch_disk: Union[disk.BaseDisk, disk.StripedDisk], disk_spec: disk.BaseDiskSpec, ) -> None: """Format and mount scratch disk. Args: vm: Linux Virtual Machine to create scratch disk on. scratch_disk: Scratch disk to be formatted and mounted. disk_spec: The BaseDiskSpec object corresponding to the disk. """ # the scratch disk is a logical device stripped together from raw disks # scratch disk device path == disk_spec device path # scratch disk device path != raw disks device path scratch_disk_device_path = '/dev/md%d' % len(vm.scratch_disks) if ( isinstance(scratch_disk, disk.StripedDisk) and scratch_disk.is_striped and not vm.hasStripedDiskDevice(scratch_disk_device_path) ): scratch_disk.device_path = scratch_disk_device_path disk_spec.device_path = scratch_disk_device_path raw_device_paths = [d.GetDevicePath() for d in scratch_disk.disks] vm.StripeDisks(raw_device_paths, scratch_disk.GetDevicePath()) if disk_spec.mount_point and not vm.IsMounted( disk_spec.mount_point, scratch_disk.GetDevicePath() ): vm.FormatDisk(scratch_disk.GetDevicePath(), disk_spec.disk_type) vm.MountDisk( scratch_disk.GetDevicePath(), disk_spec.mount_point, disk_spec.disk_type, scratch_disk.mount_options, scratch_disk.fstab_options, ) vm.scratch_disks.append(scratch_disk) def PrepareWindowsScratchDisk( self, vm, scratch_disk: Union[disk.BaseDisk, disk.StripedDisk], disk_spec: disk.BaseDiskSpec, ) -> None: """Format and mount scratch disk. Args: vm: Windows Virtual Machine to create scratch disk on. scratch_disk: Scratch disk to be formatted and mounted. disk_spec: The BaseDiskSpec object corresponding to the disk. """ # Create and then run a Diskpart script that will initialize the disks, # create a volume, and then format and mount the volume. script = '' # Get DeviceId and Model (FriendlyNam) for all disks # attached to the VM except boot disk. # Using Get-Disk has option to query for disks with no partition # (partitionstyle -eq 'raw'). Returned Number and FriendlyName represent # DeviceID and model of the disk. Device ID is used for Diskpart cleanup. # https://learn.microsoft.com/en-us/powershell/module/ # storage/get-disk?view=windowsserver2022-ps stdout, _ = vm.RemoteCommand( "Get-Disk | Where partitionstyle -eq 'raw' | " 'Select Number,FriendlyName' ) query_disk_numbers = [] lines = stdout.splitlines() for line in lines: if line: device, model = line.strip().split(' ', 1) if 'Number' in device or '-----' in device: continue if disk_spec.disk_type == 'local': if vm.DiskDriveIsLocal(device, model): query_disk_numbers.append(device) else: if not vm.DiskDriveIsLocal(device, model): query_disk_numbers.append(device) if scratch_disk.is_striped: disk_numbers = query_disk_numbers else: disk_numbers = query_disk_numbers[0] for disk_number in disk_numbers: # For each disk, set the status to online (if it is not already), # remove any formatting or partitioning on the disks, and convert # it to a dynamic disk so it can be used to create a volume. # TODO(user): Fix on Azure machines with temp disk, e.g. # Ebdsv5 which have 1 local disk with partitions. Perhaps this means # removing the clean and convert gpt lines. script += ( 'select disk %s\n' 'online disk noerr\n' 'attributes disk clear readonly\n' 'clean\n' 'convert gpt\n' 'convert dynamic\n' % disk_number ) # Create a volume out of the disk(s). if scratch_disk.is_striped: script += 'create volume stripe disk=%s\n' % ','.join(disk_numbers) else: script += 'create volume simple\n' # If a mount point has been specified, create the directory where it will be # mounted and assign the mount point to the volume. mount_command = '' if disk_spec.mount_point: vm.RemoteCommand('mkdir %s' % disk_spec.mount_point) mount_command = 'assign mount=%s\n' % disk_spec.mount_point # Format the volume, based on OS type. if vm.OS_TYPE in os_types.WINDOWS_SQLSERVER_OS_TYPES: format_command = 'format fs=ntfs quick unit=64k' else: format_command = 'format quick' # Add format type, drive letter and mount point for the diskpart script. script += '%s\nassign letter=%s\n%s' % ( format_command, vm.assigned_disk_letter.lower(), mount_command, ) # No-op, useful for understanding the state of the disks vm.RunDiskpartScript('list disk') vm.RunDiskpartScript(script) # Grant user permissions on the drive vm.RemoteCommand( 'icacls {}: /grant Users:F /L'.format(vm.assigned_disk_letter) ) vm.RemoteCommand( 'icacls {}: --% /grant Users:(OI)(CI)F /L'.format( vm.assigned_disk_letter ) ) vm.scratch_disks.append(scratch_disk) if ( vm.OS_TYPE in os_types.WINDOWS_SQLSERVER_OS_TYPES and disk_spec.disk_type != 'local' ): self.PrepareTempDbDisk(vm) def GetLocalSSDNames(self) -> list[str]: """Names of local ssd device when running Get-PhysicalDisk.""" return [] def GetLocalSSDDeviceIDs(self, vm) -> list[str]: """Get all local ssd device ids.""" names = self.GetLocalSSDNames() if not names: # This function is not implemented in this cloud provider. logging.info('Temp DB is not supported on this cloud') return [] clause = ' -or '.join([f'($_.FriendlyName -eq "{name}")' for name in names]) clause = '{' + clause + '}' # Select the DeviceID from the output # It will be one integer per line. # i.e # 3 # 4 stdout, _ = vm.RemoteCommand( f'Get-PhysicalDisk | where-object {clause} | Select -exp DeviceID' ) # int casting will clean up the spacing and make sure we are getting # correct device id. return [ str(int(device_id)) for device_id in stdout.split('\n') if device_id ] def PrepareTempDbDisk(self, vm: 'virtual_machine.BaseVirtualMachine'): """Format and mount temp db disk for sql server.""" # Create and then run a Diskpart script that will initialize the disks, # create a volume, and then format and mount the volume. # This would trigger when we detect local ssds on the VM # and the disk type is not set to local ssd. script = '' local_ssd_disks = self.GetLocalSSDDeviceIDs(vm) if not local_ssd_disks: # No local ssd detected, will not move temp db. return for disk_number in local_ssd_disks: # For local SSD disk, set the status to online (if it is not already), # remove any formatting or partitioning on the disks, and convert # it to a dynamic disk so it can be used to create a volume. script += ( 'select disk %s\n' 'online disk noerr\n' 'attributes disk clear readonly\n' 'clean\n' 'convert gpt\n' 'convert dynamic\n' % disk_number ) if len(local_ssd_disks) > 1: script += 'create volume stripe disk=%s\n' % ','.join(local_ssd_disks) else: script += 'create volume simple\n' script += 'format fs=ntfs quick unit=64k\nassign letter={}\n'.format( self.TEMPDB_DISK_LETTER.lower() ) vm.RunDiskpartScript(script) # Grant user permissions on the drive vm.RemoteCommand( 'icacls {}: /grant Users:F /L'.format(self.TEMPDB_DISK_LETTER) ) vm.RemoteCommand( 'icacls {}: --% /grant Users:(OI)(CI)F /L'.format( self.TEMPDB_DISK_LETTER ) ) vm.RemoteCommand('mkdir {}:\\TEMPDB'.format(self.TEMPDB_DISK_LETTER)) class DetachDiskStrategy: """Strategies to detach disks from VM.""" def __init__(self, vm: Any): self.vm = vm def CheckDisksDetach(self): """Checks for all the disks to be detached from the VM. Returns: True : if all the disks are detached from the VM """ non_boot_disks_attached = GetNonBootDiskCount(self.vm) if non_boot_disks_attached == 0: return True else: return False @vm_util.Retry( poll_interval=1, timeout=200, max_retries=200, retryable_exceptions=(DisksAreNotDetachedError,), ) def WaitForDisksToDetachFromVm(self) -> float: """Waits for the disks to detach from the Guest. Returns: time taken for all the disks to detach from the Guest. Raises: DisksAreNotDetachedError: if any disk is visible. """ self.CheckDisksDetach() if not self.CheckDisksDetach(): raise DisksAreNotDetachedError('Disks not visible') return time.time() def DetachDisks(self) -> float: detach_tasks = [] for scratch_disk in self.vm.scratch_disks: detach_tasks.append((scratch_disk.Detach, (), {})) detach_tasks.append((self.WaitForDisksToDetachFromVm, (), {})) time_returned = background_tasks.RunParallelThreads( detach_tasks, max_concurrency=200 ) return time_returned[1]