perfkitbenchmarker/benchmark_spec.py (953 lines of code) (raw):

# Copyright 2019 PerfKitBenchmarker Authors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Container for all data required for a benchmark to run.""" import contextlib import copy import datetime import importlib import logging import os import pickle import threading from typing import Any import uuid from absl import flags from perfkitbenchmarker import background_tasks from perfkitbenchmarker import benchmark_status from perfkitbenchmarker import capacity_reservation from perfkitbenchmarker import cloud_tpu from perfkitbenchmarker import cluster from perfkitbenchmarker import container_service from perfkitbenchmarker import context from perfkitbenchmarker import data_discovery_service from perfkitbenchmarker import disk from perfkitbenchmarker import dpb_constants from perfkitbenchmarker import dpb_service from perfkitbenchmarker import edw_compute_resource from perfkitbenchmarker import edw_service from perfkitbenchmarker import errors from perfkitbenchmarker import flag_util from perfkitbenchmarker import flags as pkb_flags from perfkitbenchmarker import key as cloud_key from perfkitbenchmarker import managed_memory_store from perfkitbenchmarker import messaging_service from perfkitbenchmarker import nfs_service from perfkitbenchmarker import non_relational_db from perfkitbenchmarker import os_types from perfkitbenchmarker import placement_group from perfkitbenchmarker import provider_info from perfkitbenchmarker import providers from perfkitbenchmarker import relational_db from perfkitbenchmarker import resource as resource_type from perfkitbenchmarker import resources # pylint:disable=unused-import # Load the __init__.py from perfkitbenchmarker import smb_service from perfkitbenchmarker import stages from perfkitbenchmarker import static_virtual_machine as static_vm from perfkitbenchmarker import virtual_machine from perfkitbenchmarker import vm_util from perfkitbenchmarker import vpn_service from perfkitbenchmarker.configs import benchmark_config_spec from perfkitbenchmarker.configs import freeze_restore_spec from perfkitbenchmarker.resources import base_job from perfkitbenchmarker.resources import example_resource from perfkitbenchmarker.resources import managed_ai_model from perfkitbenchmarker.resources.pinecone import pinecone as pinecone_resource import six import six.moves._thread import six.moves.copyreg def PickleLock(lock: threading.Lock): return UnPickleLock, (lock.locked(),) def UnPickleLock(locked: bool, *args): del args lock = threading.Lock() if locked: if not lock.acquire(False): raise pickle.UnpicklingError('Cannot acquire lock') return lock six.moves.copyreg.pickle(six.moves._thread.LockType, PickleLock) SUPPORTED = 'strict' NOT_EXCLUDED = 'permissive' SKIP_CHECK = 'none' FLAGS = flags.FLAGS flags.DEFINE_enum( 'cloud', provider_info.GCP, provider_info.VALID_CLOUDS, 'Name of the cloud to use.', ) flags.DEFINE_string( 'scratch_dir', None, 'Base name for all scratch disk directories in the VM. ' 'Upon creation, these directories will have numbers ' 'appended to them (for example /scratch0, /scratch1, etc).', ) flags.DEFINE_string( 'startup_script', None, 'Script to run right after vm boot.' ) flags.DEFINE_string( 'postrun_script', None, 'Script to run right after run stage.' ) flags.DEFINE_integer( 'create_and_boot_post_task_delay', None, 'Delay in seconds to delay in between boot tasks.', ) _ENFORCE_DISK_MOUNT_POINT_OVERRIDE = flags.DEFINE_bool( 'enforce_disk_mount_point_override', False, 'Enforce the use of the --scratch_dir flag to override the default' ' mount_point in the disk spec.', ) # pyformat: disable # TODO(user): Delete this flag after fulling updating gcl. flags.DEFINE_enum('benchmark_compatibility_checking', SUPPORTED, [SUPPORTED, NOT_EXCLUDED, SKIP_CHECK], 'Method used to check compatibility between the benchmark ' ' and the cloud. ' + SUPPORTED + ' runs the benchmark only' ' if the cloud provider has declared it supported. ' + NOT_EXCLUDED + ' runs the benchmark unless it has been' ' declared not supported by the cloud provider. ' + SKIP_CHECK + ' does not do the compatibility' ' check.') # pyformat: enable class BenchmarkSpec: """Contains the various data required to make a benchmark run.""" total_benchmarks = 0 def __init__( self, benchmark_module: Any, benchmark_config: benchmark_config_spec.BenchmarkConfigSpec, benchmark_uid: str, ): """Initialize a BenchmarkSpec object. Args: benchmark_module: The benchmark module object. benchmark_config: BenchmarkConfigSpec. The configuration for the benchmark. benchmark_uid: An identifier unique to this run of the benchmark even if the same benchmark is run multiple times with different configs. """ self.config = benchmark_config self.name = benchmark_module.BENCHMARK_NAME self.uid = benchmark_uid self.status = benchmark_status.SKIPPED self.failed_substatus = None self.status_detail = None BenchmarkSpec.total_benchmarks += 1 self.sequence_number = BenchmarkSpec.total_benchmarks self.resources: list[resource_type.BaseResource] = [] self.vms = [] self.regional_networks = {} self.networks = {} self.custom_subnets = { k: {'cloud': v.cloud, 'cidr': v.cidr} for (k, v) in self.config.vm_groups.items() } self.firewalls = {} self.networks_lock = threading.Lock() self.firewalls_lock = threading.Lock() self.vm_groups = {} self.container_specs = benchmark_config.container_specs or {} self.container_registry = None self.deleted = False self.uuid = '%s-%s' % (FLAGS.run_uri, uuid.uuid4()) self.always_call_cleanup = pkb_flags.ALWAYS_CALL_CLEANUP.value self.dpb_service: dpb_service.BaseDpbService = None self.container_cluster: container_service.BaseContainerCluster = None self.cluster: cluster.BaseCluster = None self.key = None self.relational_db = None self.non_relational_db = None self.tpus = [] self.tpu_groups = {} self.base_job = None self.edw_service = None self.edw_compute_resource = None self.example_resource = None self.nfs_service = None self.smb_service = None self.messaging_service = None self.ai_model = None self.pinecone = None self.memory_store = None self.data_discovery_service = None self.app_groups = {} self._zone_index = 0 self.capacity_reservations = [] self.placement_group_specs = benchmark_config.placement_group_specs or {} self.placement_groups = {} # Benchmark configs for relational_db and vm_groups can both be included. # This is currently used for combo benchmarks. self.vms_to_boot = {} if self.config.relational_db: self.vms_to_boot.update( relational_db.VmsToBoot(self.config.relational_db.vm_groups) ) if self.config.vm_groups: self.vms_to_boot.update(self.config.vm_groups) self.vpc_peering = self.config.vpc_peering self.vpn_service = None self.vpns = {} # dict of vpn's self.vpn_gateways = {} # dict of vpn gw's self.vpn_gateways_lock = threading.Lock() self.vpns_lock = threading.Lock() self.restore_spec = None self.freeze_path = None # Used by huggingface_bert_pretraining self.path: str self.run_dir: str self.data_dir: str self.ckpt_dir: str # Used by redis_memtier and keydb_memtier self.redis_endpoint_ip: str self.keydb_endpoint_ip: str # Used by mongodb_ycsb self.mongodb_url: str # Used by dino_benchmark self.imagenet_dir: str # Used by mlperf_inference_cpu_benchmark self.cm: str # Modules can't be pickled, but functions can, so we store the functions # necessary to run the benchmark. self.BenchmarkPrepare = benchmark_module.Prepare self.BenchmarkRun = benchmark_module.Run self.BenchmarkCleanup = benchmark_module.Cleanup # Set the current thread's BenchmarkSpec object to this one. context.SetThreadBenchmarkSpec(self) def __repr__(self): return '%s(%r)' % (self.__class__, self.__dict__) def __str__(self): return 'Benchmark name: {}\nFlags: {}'.format(self.name, self.config.flags) @contextlib.contextmanager def RedirectGlobalFlags(self): """Redirects flag reads and writes to the benchmark-specific flags object. Within the enclosed code block, reads and writes to the flags.FLAGS object are redirected to a copy that has been merged with config-provided flag overrides specific to this benchmark run. """ with self.config.RedirectFlags(FLAGS): yield def _InitializeFromSpec( self, attribute_name: str, resource_spec: freeze_restore_spec.FreezeRestoreSpec, ) -> bool: """Initializes the BenchmarkSpec attribute from the restore_spec. Args: attribute_name: The attribute to restore. resource_spec: The spec class corresponding to the resource to be restored. Returns: True if successful, False otherwise. """ if not hasattr(self, 'restore_spec'): return False if not self.restore_spec or not hasattr(self.restore_spec, attribute_name): return False if not resource_spec.enable_freeze_restore: return False logging.info('Getting %s instance from restore_spec', attribute_name) frozen_resource = copy.copy(getattr(self.restore_spec, attribute_name)) setattr(self, attribute_name, frozen_resource) return True def ConstructResources(self): """Constructs the resources for the benchmark.""" self.ConstructContainerCluster() self.ConstructContainerRegistry() # dpb service needs to go first, because it adds some vms. self.ConstructDpbService() self.ConstructCluster() self.ConstructVirtualMachines() self.ConstructRelationalDb() self.ConstructNonRelationalDb() self.ConstructKey() self.ConstructMessagingService() self.ConstructManagedAiModel() # CapacityReservations need to be constructed after VirtualMachines because # it needs information about the VMs (machine type, count, zone, etc). The # CapacityReservations will be provisioned before VMs. self.ConstructCapacityReservations() self.ConstructTpu() self.ConstructEdwService() self.ConstructEdwComputeResource() self.ConstructExampleResource() self.ConstructBaseJob() self.ConstructVPNService() self.ConstructNfsService() self.ConstructSmbService() self.ConstructDataDiscoveryService() self.ConstructBaseJob() self.ConstructMemoryStore() self.ConstructPinecone() def ConstructContainerCluster(self): """Create the container cluster.""" if self.config.container_cluster is None: return cloud = self.config.container_cluster.cloud cluster_type = self.config.container_cluster.type providers.LoadProvider(cloud) container_cluster_class = container_service.GetContainerClusterClass( cloud, cluster_type ) self.container_cluster = container_cluster_class( self.config.container_cluster ) self.resources.append(self.container_cluster) def ConstructCluster(self): """Create the cluster.""" if self.config.cluster is None: return cloud = self.config.cluster.cloud providers.LoadProvider(cloud) cluster_class = cluster.GetClusterClass(cloud) self.cluster = cluster_class(self.config.cluster) self.resources.append(self.cluster) self.vms_to_boot.update(self.cluster.ExportVmGroupsForUnmanagedProvision()) def ConstructContainerRegistry(self): """Create the container registry.""" if self.config.container_registry is None: return cloud = self.config.container_registry.cloud providers.LoadProvider(cloud) container_registry_class = container_service.GetContainerRegistryClass( cloud ) self.container_registry = container_registry_class( self.config.container_registry ) self.resources.append(self.container_registry) def ConstructDpbService(self): """Create the dpb_service object and create groups for its vms.""" if self.config.dpb_service is None: return dpb_service_spec = self.config.dpb_service dpb_service_cloud = dpb_service_spec.worker_group.cloud dpb_service_spec.worker_group.vm_count = dpb_service_spec.worker_count providers.LoadProvider(dpb_service_cloud) dpb_service_type = dpb_service_spec.service_type dpb_service_class = dpb_service.GetDpbServiceClass( dpb_service_cloud, dpb_service_type ) self.dpb_service = dpb_service_class(dpb_service_spec) # If the dpb service is un-managed, the provisioning needs to be handed # over to the vm creation module. if dpb_service_type in [ dpb_constants.UNMANAGED_DPB_SVC_YARN_CLUSTER, dpb_constants.UNMANAGED_SPARK_CLUSTER, ]: # Ensure non cluster vms are not present in the spec. if self.vms_to_boot: raise errors.Benchmarks.UnsupportedConfigError( 'Invalid Non cluster vm group {} when benchmarking ' 'unmanaged dpb service'.format(self.vms_to_boot) ) base_vm_spec = dpb_service_spec.worker_group base_vm_spec.vm_spec.zone = self.dpb_service.dpb_service_zone if dpb_service_spec.worker_count: self.vms_to_boot['worker_group'] = dpb_service_spec.worker_group # else we have a single node cluster. master_group_spec = copy.copy(base_vm_spec) master_group_spec.vm_count = 1 master_group_spec.disk_spec = None self.vms_to_boot['master_group'] = master_group_spec self.resources.append(self.dpb_service) def ConstructRelationalDb(self): """Creates the relational db and create groups for its vms.""" if self.config.relational_db is None: return if self._InitializeFromSpec('relational_db', self.config.relational_db): return cloud = self.config.relational_db.cloud is_managed_db = self.config.relational_db.is_managed_db engine = self.config.relational_db.engine providers.LoadProvider(cloud) relational_db_class = relational_db.GetRelationalDbClass( cloud, is_managed_db, engine ) if not self.config.relational_db.engine_version: self.config.relational_db.engine_version = ( relational_db_class.GetDefaultEngineVersion(engine) ) self.relational_db = relational_db_class(self.config.relational_db) self.resources.append(self.relational_db) def ConstructNonRelationalDb(self) -> None: """Initializes the non_relational db.""" db_spec: non_relational_db.BaseNonRelationalDbSpec = ( self.config.non_relational_db ) if not db_spec: return # Initialization from restore spec if self._InitializeFromSpec('non_relational_db', db_spec): return # Initialization from benchmark config spec logging.info( 'Constructing non_relational_db instance with spec: %s.', db_spec ) service_type = db_spec.service_type non_relational_db_class = non_relational_db.GetNonRelationalDbClass( service_type ) self.non_relational_db = non_relational_db_class.FromSpec(db_spec) def ConstructKey(self) -> None: """Initializes the cryptographic key.""" key_spec: cloud_key.BaseKeySpec = self.config.key if not key_spec: return logging.info('Constructing key with spec: %s.', key_spec) key_class = cloud_key.GetKeyClass(key_spec.cloud) self.key = key_class(key_spec) self.resources.append(self.key) def ConstructTpuGroup(self, group_spec): """Constructs the BenchmarkSpec's cloud TPU objects.""" if group_spec is None: return cloud = group_spec.cloud providers.LoadProvider(cloud) tpu_class = cloud_tpu.GetTpuClass(cloud) return tpu_class(group_spec) # pytype: disable=not-instantiable def ConstructTpu(self): """Constructs the BenchmarkSpec's cloud TPU objects.""" tpu_group_specs = self.config.tpu_groups for group_name, group_spec in sorted(tpu_group_specs.items()): tpu = self.ConstructTpuGroup(group_spec) self.tpu_groups[group_name] = tpu self.tpus.append(tpu) def ConstructEdwService(self): """Create the edw_service object.""" if self.config.edw_service is None: return # Load necessary modules from the provider to account for dependencies # TODO(saksena): Replace with # providers.LoadProvider(string.lower(FLAGS.cloud)) providers.LoadProvider( edw_service.TYPE_2_PROVIDER.get(self.config.edw_service.type) ) # Load the module for the edw service based on type edw_service_type = self.config.edw_service.type edw_service_module = importlib.import_module( edw_service.TYPE_2_MODULE.get(edw_service_type) ) # The edw_service_type in certain cases may be qualified with a hosting # cloud eg. snowflake_aws,snowflake_gcp, etc. # However the edw_service_class_name in all cases will still be cloud # agnostic eg. Snowflake. edw_service_class_name = edw_service_type.split('_')[0] edw_service_class = getattr( edw_service_module, edw_service_class_name[0].upper() + edw_service_class_name[1:], ) # Check if a new instance needs to be created or restored from snapshot self.edw_service = edw_service_class( self.config.edw_service ) # pytype: disable=not-instantiable def ConstructEdwComputeResource(self): """Create an edw_compute_resource object.""" if self.config.edw_compute_resource is None: return edw_compute_resource_cloud = self.config.edw_compute_resource.cloud edw_compute_resource_type = self.config.edw_compute_resource.type providers.LoadProvider(edw_compute_resource_cloud) edw_compute_resource_class = ( edw_compute_resource.GetEdwComputeResourceClass( edw_compute_resource_cloud, edw_compute_resource_type ) ) self.edw_compute_resource = edw_compute_resource_class( self.config.edw_compute_resource ) # pytype: disable=not-instantiable self.resources.append(self.edw_compute_resource) def ConstructExampleResource(self): """Create an example_resource object. Also call this from pkb.py.""" if self.config.example_resource is None: return example_resource_type = self.config.example_resource.example_type example_resource_class = example_resource.GetExampleResourceClass( example_resource_type ) self.example_resource = example_resource_class( self.config.example_resource ) # pytype: disable=not-instantiable self.resources.append(self.example_resource) def ConstructBaseJob(self): """Create an instance of the base job.It is also called from pkb.py.""" if self.config.base_job is None: return job_type = self.config.base_job.job_type cloud = self.config.base_job.CLOUD providers.LoadProvider(cloud) job_class = base_job.GetJobClass(job_type) self.base_job = job_class( self.config.base_job, self.container_registry ) # pytype: disable=not-instantiable self.resources.append(self.base_job) def ConstructManagedAiModel(self): """Create an example_resource object. Also call this from pkb.py.""" if self.config.ai_model is None: return cloud = self.config.ai_model.cloud providers.LoadProvider(cloud) model_class = managed_ai_model.GetManagedAiModelClass( cloud, self.config.ai_model.interface ) assert self.vm_groups vm = self.vm_groups[ 'clients' if 'clients' in self.vm_groups else 'default' ][0] self.ai_model = model_class( vm, self.config.ai_model ) # pytype: disable=not-instantiable self.resources.append(self.ai_model) def ConstructPinecone(self): """Create an example_resource object. Also call this from pkb.py.""" if self.config.pinecone is None: return cloud = self.config.pinecone.cloud providers.LoadProvider(cloud) model_class = pinecone_resource.GetPineconeResourceClass(cloud) self.pinecone = model_class( self.config.pinecone ) # pytype: disable=not-instantiable self.pinecone.SetVms(self.vm_groups) self.resources.append(self.pinecone) def ConstructMemoryStore(self): """Create the memory store instance.""" if self.config.memory_store is None: return cloud = self.config.memory_store.cloud providers.LoadProvider(cloud) managed_memory_store_class = ( managed_memory_store.GetManagedMemoryStoreClass( cloud, self.config.memory_store.service_type, self.config.memory_store.memory_store_type, ) ) self.memory_store = managed_memory_store_class( self.config.memory_store ) # pytype: disable=not-instantiable self.memory_store.SetVms(self.vm_groups) self.resources.append(self.memory_store) def ConstructNfsService(self): """Construct the NFS service object. Creates an NFS Service only if an NFS disk is found in the disk_specs. """ if self.nfs_service: logging.info('NFS service already created: %s', self.nfs_service) return for group_spec in self.vms_to_boot.values(): if not group_spec.disk_spec or not group_spec.vm_count: continue disk_spec = group_spec.disk_spec if disk_spec.disk_type != disk.NFS: continue # Choose which nfs_service to create. if disk_spec.nfs_ip_address: self.nfs_service = nfs_service.StaticNfsService(disk_spec) elif disk_spec.nfs_managed: cloud = group_spec.cloud providers.LoadProvider(cloud) nfs_class = nfs_service.GetNfsServiceClass(cloud) self.nfs_service = nfs_class( disk_spec, group_spec.vm_spec.zone ) # pytype: disable=not-instantiable else: self.nfs_service = nfs_service.UnmanagedNfsService( disk_spec, self.vms[0] ) logging.debug('NFS service %s', self.nfs_service) break def ConstructSmbService(self): """Construct the SMB service object. Creates an SMB Service only if an SMB disk is found in the disk_specs. """ if self.smb_service: logging.info('SMB service already created: %s', self.smb_service) return for group_spec in self.vms_to_boot.values(): if not group_spec.disk_spec or not group_spec.vm_count: continue disk_spec = group_spec.disk_spec if disk_spec.disk_type != disk.SMB: continue cloud = group_spec.cloud providers.LoadProvider(cloud) smb_class = smb_service.GetSmbServiceClass(cloud) self.smb_service = smb_class( disk_spec, group_spec.vm_spec.zone ) # pytype: disable=not-instantiable logging.debug('SMB service %s', self.smb_service) break def ConstructVirtualMachineGroup( self, group_name, group_spec ) -> list[virtual_machine.VirtualMachine]: """Construct the virtual machine(s) needed for a group.""" vms = [] vm_count = group_spec.vm_count # First create the Static VM objects. if group_spec.static_vms: specs = [ spec for spec in group_spec.static_vms if (FLAGS.static_vm_tags is None or spec.tag in FLAGS.static_vm_tags) ][:vm_count] for vm_spec in specs: static_vm_class = static_vm.GetStaticVmClass(vm_spec.os_type) vms.append(static_vm_class(vm_spec)) # pytype: disable=not-instantiable os_type = group_spec.os_type cloud = group_spec.cloud # This throws an exception if the benchmark is not # supported. self._CheckBenchmarkSupport(cloud) # Then create the remaining VM objects using VM and disk specs. if group_spec.disk_spec: disk_spec = group_spec.disk_spec else: disk_spec = None if group_spec.placement_group_name: group_spec.vm_spec.placement_group = self.placement_groups[ group_spec.placement_group_name ] for _ in range(vm_count - len(vms)): # Assign a zone to each VM sequentially from the --zones flag. if FLAGS.zone: zone_list = FLAGS.zone group_spec.vm_spec.zone = zone_list[self._zone_index] self._zone_index = ( self._zone_index + 1 if self._zone_index < len(zone_list) - 1 else 0 ) if group_spec.cidr: # apply cidr range to all vms in vm_group group_spec.vm_spec.cidr = group_spec.cidr vm = self._CreateVirtualMachine(group_spec.vm_spec, os_type, cloud) vm.vm_group = group_name if disk_spec and not vm.is_static: if _ENFORCE_DISK_MOUNT_POINT_OVERRIDE.value: disk_spec.mount_point = FLAGS.scratch_dir vm.SetDiskSpec(disk_spec, group_spec.disk_count) vms.append(vm) return vms def ConstructCapacityReservations(self): """Construct capacity reservations for each VM group.""" if not FLAGS.use_capacity_reservations: return for vm_group in self.vm_groups.values(): cloud = vm_group[0].CLOUD providers.LoadProvider(cloud) capacity_reservation_class = capacity_reservation.GetResourceClass(cloud) self.capacity_reservations.append( capacity_reservation_class(vm_group) # pytype: disable=not-instantiable ) def _CheckBenchmarkSupport(self, cloud): """Throw an exception if the benchmark isn't supported.""" if FLAGS.benchmark_compatibility_checking == SKIP_CHECK: return provider_info_class = provider_info.GetProviderInfoClass(cloud) benchmark_ok = provider_info_class.IsBenchmarkSupported(self.name) if FLAGS.benchmark_compatibility_checking == NOT_EXCLUDED: if benchmark_ok is None: benchmark_ok = True if not benchmark_ok: raise ValueError( 'Provider {} does not support {}. Use ' '--benchmark_compatibility_checking=none ' 'to override this check.'.format(provider_info_class.CLOUD, self.name) ) def _ConstructJujuController( self, group_spec ) -> virtual_machine.VirtualMachine: """Construct a VirtualMachine object for a Juju controller.""" juju_spec = copy.copy(group_spec) juju_spec.vm_count = 1 jujuvms = self.ConstructVirtualMachineGroup('juju', juju_spec) if len(jujuvms): jujuvm = jujuvms.pop() jujuvm.is_controller = True return jujuvm raise errors.Config.ParseError( 'Attempted to construct Juju VM but none were created.' ) def ConstructVirtualMachines(self): """Constructs the BenchmarkSpec's VirtualMachine objects.""" self.ConstructPlacementGroups() vm_group_specs = self.vms_to_boot clouds = {} for group_name, group_spec in sorted(vm_group_specs.items()): vms = self.ConstructVirtualMachineGroup(group_name, group_spec) if group_spec.os_type.startswith('juju'): # The Juju VM needs to be created first, so that subsequent units can # be properly added under its control. if group_spec.cloud in clouds: jujuvm = clouds[group_spec.cloud] else: jujuvm = self._ConstructJujuController(group_spec) clouds[group_spec.cloud] = jujuvm for vm in vms: vm.controller = clouds[group_spec.cloud] jujuvm.units.extend(vms) # pytype: disable=attribute-error if jujuvm and jujuvm not in self.vms: self.vms.extend([jujuvm]) self.vm_groups['%s_juju_controller' % group_spec.cloud] = [jujuvm] self.vm_groups[group_name] = vms self.vms.extend(vms) # In the case of an un-managed yarn cluster, for hadoop software # installation, the dpb service instance needs access to constructed # master group and worker group. if self.config.dpb_service and self.config.dpb_service.service_type in [ dpb_constants.UNMANAGED_DPB_SVC_YARN_CLUSTER, dpb_constants.UNMANAGED_SPARK_CLUSTER, ]: self.dpb_service.vms['master_group'] = self.vm_groups['master_group'] if self.config.dpb_service.worker_count: self.dpb_service.vms['worker_group'] = self.vm_groups['worker_group'] else: # single node cluster self.dpb_service.vms['worker_group'] = [] def ConstructPlacementGroups(self): for placement_group_name, placement_group_spec in ( self.placement_group_specs ).items(): self.placement_groups[placement_group_name] = self._CreatePlacementGroup( placement_group_spec, placement_group_spec.CLOUD ) def ConstructVPNService(self): """Create the VPNService object.""" if self.config.vpn_service is None: return self.vpn_service = vpn_service.VPNService(self.config.vpn_service) def ConstructMessagingService(self): """Create the messaging_service object. Assumes VMs are already constructed. """ if self.config.messaging_service is None: return cloud = self.config.messaging_service.cloud delivery = self.config.messaging_service.delivery providers.LoadProvider(cloud) messaging_service_class = messaging_service.GetMessagingServiceClass( cloud, delivery ) self.messaging_service = messaging_service_class.FromSpec( self.config.messaging_service ) self.messaging_service.setVms(self.vm_groups) def ConstructDataDiscoveryService(self): """Create the data_discovery_service object.""" if not self.config.data_discovery_service: return cloud = self.config.data_discovery_service.cloud service_type = self.config.data_discovery_service.service_type providers.LoadProvider(cloud) data_discovery_service_class = ( data_discovery_service.GetDataDiscoveryServiceClass(cloud, service_type) ) self.data_discovery_service = data_discovery_service_class.FromSpec( self.config.data_discovery_service ) def Prepare(self): targets = [(vm.PrepareBackgroundWorkload, (), {}) for vm in self.vms] background_tasks.RunParallelThreads(targets, len(targets)) def Provision(self): """Prepares the VMs and networks necessary for the benchmark to run.""" should_restore = ( hasattr(self, 'restore_spec') and self.restore_spec is not None ) # Create capacity reservations if the cloud supports it. Note that the # capacity reservation class may update the VMs themselves. This is true # on AWS, because the VM needs to be aware of the capacity reservation id # before its Create() method is called. Furthermore, if the user does not # specify an AWS zone, but a region instead, the AwsCapacityReservation # class will make a reservation in a zone that has sufficient capacity. # In this case the VM's zone attribute, and the VMs network instance # need to be updated as well. if self.capacity_reservations: background_tasks.RunThreaded( lambda res: res.Create(), self.capacity_reservations ) # Sort networks into a guaranteed order of creation based on dict key. # There is a finite limit on the number of threads that are created to # provision networks. Until support is added to provision resources in an # order based on dependencies, this key ordering can be used to avoid # deadlock by placing dependent networks later and their dependencies # earlier. networks = [self.networks[key] for key in sorted(self.networks.keys())] background_tasks.RunThreaded(lambda net: net.Create(), networks) # VPC peering is currently only supported for connecting 2 VPC networks if self.vpc_peering: if len(networks) > 2: raise errors.Error( 'Networks of size %d are not currently supported.' % (len(networks)) ) # Ignore Peering for one network elif len(networks) == 2: networks[0].Peer(networks[1]) if self.container_registry: self.container_registry.Create() for container_spec in self.container_specs.values(): if container_spec.static_image: continue container_spec.image = self.container_registry.GetOrBuild( container_spec.image ) if self.container_cluster: self.container_cluster.Create() # do after network setup but before VM created if self.nfs_service and self.nfs_service.CLOUD != nfs_service.UNMANAGED: self.nfs_service.Create() if self.smb_service: self.smb_service.Create() for placement_group_object in self.placement_groups.values(): placement_group_object.Create() if self.vms: # We separate out creating, booting, and preparing the VMs into two phases # so that we don't slow down the creation of all the VMs by running # commands on the VMs that booted. background_tasks.RunThreaded( lambda vm: vm.CreateAndBoot(), self.vms, post_task_delay=FLAGS.create_and_boot_post_task_delay, ) if self.nfs_service and self.nfs_service.CLOUD == nfs_service.UNMANAGED: self.nfs_service.Create() if not FLAGS.skip_vm_preparation: background_tasks.RunThreaded(lambda vm: vm.PrepareAfterBoot(), self.vms) else: logging.info('Skipping VM preparation.') sshable_vms = [ vm for vm in self.vms if vm.OS_TYPE not in os_types.WINDOWS_OS_TYPES ] sshable_vm_groups = {} for group_name, group_vms in self.vm_groups.items(): sshable_vm_groups[group_name] = [ vm for vm in group_vms if vm.OS_TYPE not in os_types.WINDOWS_OS_TYPES ] vm_util.GenerateSSHConfig(sshable_vms, sshable_vm_groups) if self.cluster: if self.cluster.unmanaged: self.cluster.ImportVmGroups( self.vm_groups['headnode'][0], self.vm_groups['workers'] ) self.cluster.Create() if self.dpb_service: self.dpb_service.Create() if hasattr(self, 'relational_db') and self.relational_db: self.relational_db.SetVms(self.vm_groups) self.relational_db.Create(restore=should_restore) if self.non_relational_db: self.non_relational_db.Create(restore=should_restore) if hasattr(self, 'key') and self.key: self.key.Create() if self.tpus: background_tasks.RunThreaded(lambda tpu: tpu.Create(), self.tpus) if self.ai_model: self.ai_model.Create() if self.pinecone: self.pinecone.Create() if self.edw_service: if ( not self.edw_service.user_managed and self.edw_service.SERVICE_TYPE == 'redshift' ): # The benchmark creates the Redshift cluster's subnet group in the # already provisioned virtual private cloud (vpc). for network in networks: if network.__class__.__name__ == 'AwsNetwork': self.edw_service.cluster_subnet_group.subnet_id = network.subnet.id self.edw_service.Create() if self.edw_compute_resource: self.edw_compute_resource.Create() if hasattr(self, 'memory_store') and self.memory_store: self.memory_store.Create() if self.example_resource: self.example_resource.Create() if self.base_job: self.base_job.Create() if self.vpn_service: self.vpn_service.Create() if hasattr(self, 'messaging_service') and self.messaging_service: self.messaging_service.Create() if self.data_discovery_service: self.data_discovery_service.Create() def Delete(self): if self.deleted: return should_freeze = ( hasattr(self, 'freeze_path') and self.freeze_path is not None ) if should_freeze: self.Freeze() if self.container_registry: self.container_registry.Delete() if self.dpb_service: self.dpb_service.Delete() if hasattr(self, 'relational_db') and self.relational_db: self.relational_db.Delete(freeze=should_freeze) if hasattr(self, 'non_relational_db') and self.non_relational_db: self.non_relational_db.Delete(freeze=should_freeze) if hasattr(self, 'key') and self.key: self.key.Delete() if self.tpus: background_tasks.RunThreaded(lambda tpu: tpu.Delete(), self.tpus) if self.edw_service: self.edw_service.Delete() if hasattr(self, 'edw_compute_resource') and self.edw_compute_resource: self.edw_compute_resource.Delete() if self.example_resource: self.example_resource.Delete() if self.base_job: self.base_job.Delete() if self.nfs_service: self.nfs_service.Delete() if self.smb_service: self.smb_service.Delete() if hasattr(self, 'messaging_service') and self.messaging_service: self.messaging_service.Delete() if hasattr(self, 'memory_store') and self.memory_store: self.memory_store.Delete() if hasattr(self, 'ai_model') and self.ai_model: self.ai_model.Delete() if hasattr(self, 'data_discovery_service') and self.data_discovery_service: self.data_discovery_service.Delete() if hasattr(self, 'pinecone') and self.pinecone: self.pinecone.Delete() # Note: It is ok to delete capacity reservations before deleting the VMs, # and will actually save money (mere seconds of usage). if self.capacity_reservations: try: background_tasks.RunThreaded( lambda reservation: reservation.Delete(), self.capacity_reservations ) except Exception: # pylint: disable=broad-except logging.exception( 'Got an exception deleting CapacityReservations. ' 'Attempting to continue tearing down.' ) if self.vms: try: # Delete VMs first to detach any multi-attached disks. background_tasks.RunThreaded(self.DeleteVm, self.vms) background_tasks.RunThreaded( lambda vm: vm.DeleteScratchDisks(), self.vms ) except Exception: logging.exception( 'Got an exception deleting VMs. ' 'Attempting to continue tearing down.' ) if hasattr(self, 'placement_groups'): for placement_group_object in self.placement_groups.values(): placement_group_object.Delete() for firewall in self.firewalls.values(): try: firewall.DisallowAllPorts() except Exception: logging.exception( 'Got an exception disabling firewalls. ' 'Attempting to continue tearing down.' ) if self.container_cluster: self.container_cluster.DeleteServices() self.container_cluster.DeleteContainers() self.container_cluster.Delete() if self.cluster: self.cluster.Delete() for net in self.networks.values(): try: net.Delete() except Exception: logging.exception( 'Got an exception deleting networks. ' 'Attempting to continue tearing down.' ) if hasattr(self, 'vpn_service') and self.vpn_service: self.vpn_service.Delete() self.deleted = True def GetSamples(self): """Returns samples created from benchmark resources.""" samples = [] for resource in self.resources: samples.extend(resource.GetSamples()) return samples def StartBackgroundWorkload(self): targets = [(vm.StartBackgroundWorkload, (), {}) for vm in self.vms] background_tasks.RunParallelThreads(targets, len(targets)) def StopBackgroundWorkload(self): targets = [(vm.StopBackgroundWorkload, (), {}) for vm in self.vms] background_tasks.RunParallelThreads(targets, len(targets)) def _IsSafeKeyOrValueCharacter(self, char): return char.isalpha() or char.isnumeric() or char == '_' def _SafeLabelKeyOrValue(self, key): result = ''.join( c if self._IsSafeKeyOrValueCharacter(c) else '_' for c in key.lower() ) # max length constraints on keys and values # https://cloud.google.com/resource-manager/docs/creating-managing-labels max_safe_length = 63 # GCP labels are not allowed to start or end with '_' return result[:max_safe_length].strip('_') def _GetResourceDict(self, time_format, timeout_minutes=None): """Gets a list of tags to be used to tag resources.""" now_utc = datetime.datetime.utcnow() if not timeout_minutes: timeout_minutes = FLAGS.timeout_minutes timeout_utc = now_utc + datetime.timedelta(minutes=timeout_minutes) tags = { resource_type.TIMEOUT_METADATA_KEY: timeout_utc.strftime(time_format), 'create_time_utc': now_utc.strftime(time_format), 'benchmark': self.name, 'perfkit_uuid': self.uuid, 'owner': FLAGS.owner, 'benchmark_uid': self.uid, } # add metadata key value pairs metadata_dict = ( flag_util.ParseKeyValuePairs(FLAGS.metadata) if hasattr(FLAGS, 'metadata') else dict() ) for key, value in metadata_dict.items(): tags[self._SafeLabelKeyOrValue(key)] = self._SafeLabelKeyOrValue(value) return tags def GetResourceTags(self, timeout_minutes=None): """Gets a list of tags to be used to tag resources.""" return self._GetResourceDict( resource_type.METADATA_TIME_FORMAT, timeout_minutes ) def _CreatePlacementGroup(self, placement_group_spec, cloud): """Create a placement group in zone. Args: placement_group_spec: A placement_group.BasePlacementGroupSpec object. cloud: The cloud for the placement group. See the flag of the same name for more information. Returns: A placement_group.BasePlacementGroup object. """ placement_group_class = placement_group.GetPlacementGroupClass(cloud) if placement_group_class: return placement_group_class( placement_group_spec ) # pytype: disable=not-instantiable else: return None def _CreateVirtualMachine(self, vm_spec, os_type, cloud): """Create a vm in zone. Args: vm_spec: A virtual_machine.BaseVmSpec object. os_type: The type of operating system for the VM. See the flag of the same name for more information. cloud: The cloud for the VM. See the flag of the same name for more information. Returns: A virtual_machine.BaseVirtualMachine object. """ vm = static_vm.StaticVirtualMachine.GetStaticVirtualMachine() if vm: return vm vm_class = virtual_machine.GetVmClass(cloud, os_type) if vm_class is None: raise errors.Error( 'VMs of type %s" are not currently supported on cloud "%s".' % (os_type, cloud) ) return vm_class(vm_spec) # pytype: disable=not-instantiable def DeleteVm(self, vm): """Deletes a single vm and scratch disk if required. Args: vm: The BaseVirtualMachine object representing the VM. """ if vm.is_static and vm.install_packages: vm.PackageCleanup() vm.Delete() @staticmethod def _GetPickleFilename(uid): """Returns the filename for the pickled BenchmarkSpec.""" return os.path.join(vm_util.GetTempDir(), uid) def Pickle(self, filename=None): """Pickles the spec so that it can be unpickled on a subsequent run.""" with open( filename or self._GetPickleFilename(self.uid), 'wb' ) as pickle_file: pickle.dump(self, pickle_file, 2) def Freeze(self): """Pickles the spec to a destination, defaulting to tempdir if not found.""" if not self.freeze_path: return logging.info('Freezing benchmark_spec to %s', self.freeze_path) try: self.Pickle(self.freeze_path) except FileNotFoundError: default_path = f'{vm_util.GetTempDir()}/restore_spec.pickle' logging.exception( 'Could not find file path %s, defaulting freeze to %s.', self.freeze_path, default_path, ) self.Pickle(default_path) @classmethod def GetBenchmarkSpec(cls, benchmark_module, config, uid): """Unpickles or creates a BenchmarkSpec and returns it. Args: benchmark_module: The benchmark module object. config: BenchmarkConfigSpec. The configuration for the benchmark. uid: An identifier unique to this run of the benchmark even if the same benchmark is run multiple times with different configs. Returns: A BenchmarkSpec object. """ if stages.PROVISION in FLAGS.run_stage: return cls(benchmark_module, config, uid) try: with open(cls._GetPickleFilename(uid), 'rb') as pickle_file: bm_spec = pickle.load(pickle_file) except Exception as e: # pylint: disable=broad-except logging.error( 'Unable to unpickle spec file for benchmark %s.', benchmark_module.BENCHMARK_NAME, ) raise e # Always let the spec be deleted after being unpickled so that # it's possible to run cleanup even if cleanup has already run. bm_spec.deleted = False bm_spec.status = benchmark_status.SKIPPED context.SetThreadBenchmarkSpec(bm_spec) return bm_spec def CheckPrerequisites(self) -> None: """Checks preconditions for the benchmark_spec.""" for app_group in self.app_groups: app_group.CheckPrerequisites() if self.dpb_service: self.dpb_service.CheckPrerequisites() if self.messaging_service: self.messaging_service.CheckPrerequisites() if hasattr(self, 'memory_store') and self.memory_store: self.memory_store.CheckPrerequisites()