perfkitbenchmarker/providers/gcp/gcp_cluster.py (134 lines of code) (raw):

# Copyright 2025 PerfKitBenchmarker Authors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Class to represent a GCE cluster.""" import os from absl import flags as absl_flags import jinja2 from perfkitbenchmarker import background_tasks from perfkitbenchmarker import cluster from perfkitbenchmarker import data from perfkitbenchmarker import linux_packages from perfkitbenchmarker import linux_virtual_machine from perfkitbenchmarker import provider_info from perfkitbenchmarker import vm_util from perfkitbenchmarker.providers.gcp import flags from perfkitbenchmarker.providers.gcp import util FLAGS = absl_flags.FLAGS _CONFIG_FILE_NAME = 'cluster.yaml' class GceClusterSpec(cluster.BaseClusterSpec): """Class to represent a GCE cluster.""" CLOUD = provider_info.GCP class GceCluster(cluster.BaseCluster): """Class to represent a GCE cluster.""" CLOUD = provider_info.GCP # TODO(yuyanting): Add cluster type as attributes. # We probably will need to implement different type of clusters with # different default templates and Render functions. DEFAULT_TEMPLATE = 'cluster/cluster_toolkit.yaml.j2' def __init__(self, cluster_spec: GceClusterSpec): """Initialize GceCluster class. Args: cluster_spec: cluster.GceClusterSpec object. """ super().__init__(cluster_spec) self.project: str = cluster_spec.workers.vm_spec.project self._config_path: str = os.path.join( vm_util.GetTempDir(), _CONFIG_FILE_NAME) self._pub_key, _, _ = vm_util.IssueCommand( ['cat', vm_util.GetPublicKeyPath()] ) self.nfs_path: str = '/opt/apps' def _CreateDependencies(self): """Method that will be called once before _CreateResource() is called. Supplying this method is optional. It is intended to allow additional flexibility in creating resource dependencies separately from _Create(). """ self._RenderClusterConfig() def _RenderClusterConfig(self): """Render the config file that will be used to create the cluster.""" tags = util.GetDefaultTags(FLAGS.timeout_minutes) controller_tags = tags.copy() controller_tags['ssh-keys'] = 'perfkit:' + self._pub_key compute_tags = tags.copy() compute_tags['ssh-keys'] = 'root:' + self._pub_key with open(data.ResourcePath(self.template)) as content: template = jinja2.Template( content.read(), undefined=jinja2.StrictUndefined ) # Disable SMT when possible (default in cluster toolkit) threads_per_core = 0 if FLAGS['disable_smt'].present: threads_per_core = 1 if FLAGS.disable_smt else 2 self._config = template.render( name=self.name, zone=self.zone, region=util.GetRegionFromZone(self.zone), num_workers=self.num_workers, worker_machine_type=self.worker_machine_type, headnode_machine_type=self.headnode_machine_type, image_family=self.workers_spec.image_family, image_project=self.workers_spec.image_project, project=self.project, # boot disk of headnode is also mounted as NFS nfs_size=self.headnode_spec.boot_disk_size, compute_tags=compute_tags, controller_tags=controller_tags, enabe_spot_vm=FLAGS.gce_preemptible_vms, threads_per_core=threads_per_core, ) def _Create(self): """Create GCP cluster with cluster toolkit.""" with open(self._config_path, 'w') as config_file: config_file.write(self._config) vm_util.IssueCommand([ flags.GCLUSTER_PATH.value, 'deploy', self._config_path, '--force', '--auto-approve', f'--out={vm_util.GetTempDir()}', '-l', 'IGNORE' ]) def _PostCreate(self): """Post create actions after GCP cluster is created. The method does the following: 1. Create VM objects and backfill VM information based on VM name. 2. Prepare VM environment. 3. Configure SSH on both headnode and workers. """ def _UpdateHeadNode(vm): vm.name = f'{self.name}-controller' self.headnode_vm = self.BackfillVm(self.headnode_spec, _UpdateHeadNode) self._WaitUntilReady() for i in range(self.num_workers): def _UpdateWorker(vm): vm.name = f'{self.name}-computenodeset-{i}' # pylint: disable=cell-var-from-loop vm.user_name = 'root' if self.workers_static_disk: vm.disks = [self.workers_static_disk] self.worker_vms.append( self.BackfillVm( self.workers_spec, _UpdateWorker, ) ) # The workers may provision as we issue this command for the first time. self.RemoteCommand( 'sudo sed -i ' '"s/PermitRootLogin no/PermitRootLogin yes/g" ' '/etc/ssh/sshd_config', timeout=600, ) self.RemoteCommand('sudo service sshd restart', timeout=120) self.headnode_vm.RemoteCommand(f'sudo chmod -R 755 {self.nfs_path}') self.vms = [self.headnode_vm] + self.worker_vms def _SetupEnvironment(vm): vm.RemoteCommand( f'sudo ln -s {self.nfs_path} {linux_packages.INSTALL_DIR}' ) vm.PrepareVMEnvironment() background_tasks.RunThreaded(_SetupEnvironment, self.vms) vm_util.GenerateSSHConfig( self.vms, { 'headnode': [self.headnode_vm], 'worker': self.worker_vms, }, ) def _Delete(self): """Delete GCP cluster with cluster toolkit.""" vm_util.IssueCommand([ flags.GCLUSTER_PATH.value, 'destroy', f'{vm_util.GetTempDir()}/{self.name}', '--auto-approve', ]) def AuthenticateVM(self): """Authenticate all VMs in the cluster to access each other.""" for vm in self.vms: if vm.has_private_key: continue vm.RemoteHostCopy( vm_util.GetPrivateKeyPath(), linux_virtual_machine.REMOTE_KEY_PATH ) vm.RemoteCommand( 'echo "Host *\n StrictHostKeyChecking no\n User=root\n" >' ' ~/.ssh/config' ) vm.RemoteCommand('chmod 600 ~/.ssh/config') vm.has_private_key: bool = True class H4dCluster(GceCluster): """Class representing a H4D cluster.""" DEFAULT_TEMPLATE = 'cluster/h4d.yaml.j2' TYPE = 'h4d'