perfkitbenchmarker/providers/aws/aws_cluster.py (226 lines of code) (raw):

"""Class to represent a AWS cluster.""" import json import os from absl import flags as absl_flags import jinja2 from perfkitbenchmarker import background_tasks from perfkitbenchmarker import cluster from perfkitbenchmarker import data from perfkitbenchmarker import linux_packages from perfkitbenchmarker import provider_info from perfkitbenchmarker import regex_util from perfkitbenchmarker import vm_util from perfkitbenchmarker.providers.aws import aws_network from perfkitbenchmarker.providers.aws import aws_virtual_machine from perfkitbenchmarker.providers.aws import flags from perfkitbenchmarker.providers.aws import util FLAGS = absl_flags.FLAGS class AWSClusterSpec(cluster.BaseClusterSpec): """Class to represent a AWS cluster.""" CLOUD = provider_info.AWS class AWSCluster(cluster.BaseCluster): """Class to represent a AWS cluster.""" CLOUD = provider_info.AWS DEFAULT_TEMPLATE = 'cluster/parallel_cluster.yaml.j2' def __init__(self, cluster_spec: AWSClusterSpec): super().__init__(cluster_spec) self._pub_key, _, _ = vm_util.IssueCommand( ['cat', vm_util.GetPublicKeyPath()] ) self._key_manager = aws_virtual_machine.AwsKeyFileManager() # Headnode needs a public IP self._headnode_subnet_id = None self._worker_subnet_id = None self.region = util.GetRegionFromZone(self.zone) self._config_path = os.path.join(vm_util.GetTempDir(), self.name + '.yaml') self._network_stack = None self._vpc: aws_network.AwsVpc = None self.nfs_path = '/opt/apps' def _CreateDependencies(self): """Create dependencies for parallel cluster. Dependencies includes key files, vpcs and cloudformation template. """ # Create keypair, VPC & Subnet self._key_manager.ImportKeyfile(self.region) # Use pcluster to create and configure vpc & subnet # Headnode needs a public IP to be accessible and requires a public subnet. # Workers having more than 1 network interface needs to sit inside # private subnet and connect to public subnet with NAT. # TODO(yuyanting): Consider replicating with low level ec2 commands. # subnet_config_path points to a generic configuration that is only used # to create dependencies such as VPC and subnets. subnet_config_path = os.path.join(vm_util.GetTempDir(), 'subnet') stdout, _, _ = vm_util.IssueCommand([ 'bash', '-c', f'printf "{self.region}\n{self._key_manager.GetKeyNameForRun()}\n' # The follownig does not matter as this step only creates the network. 'slurm\nalinux2\nt2.micro\n1\nqueue1\n1\nt2.micro\n1\ny\n' f'{self.zone}\n1\n" ' f'| pcluster configure --config {subnet_config_path}', ]) # Parse stack name: aws cloudformation delete-stack help self._network_stack = regex_util.ExtractAllMatches( r'Status: (parallelclusternetworking-pubpriv-\d+) ', stdout )[0] suffix = self._network_stack.split('-')[-1] stdout, _, _ = vm_util.IssueCommand([ 'aws', 'ec2', 'describe-vpcs', '--filters', f'Name=tag:Name,Values=ParallelClusterVPC-{suffix}', '--region', self.region, ]) resources_to_tag = [] vpc_id = json.loads(stdout)['Vpcs'][0]['VpcId'] self._vpc = aws_network.AwsVpc(self.region, vpc_id) resources_to_tag.append(vpc_id) stdout, _, _ = vm_util.IssueCommand(['cat', subnet_config_path]) subnet_ids = regex_util.ExtractAllMatches(r'(subnet-\w+)', stdout) self._headnode_subnet_id = subnet_ids[0] self._worker_subnet_id = subnet_ids[1] resources_to_tag.extend(subnet_ids) resources_to_tag.append( json.loads( vm_util.IssueCommand([ 'aws', 'ec2', 'describe-internet-gateways', '--filters', f'Name=attachment.vpc-id,Values={vpc_id}', '--region', self.region, ])[0] )['InternetGateways'][0]['InternetGatewayId'] ) route_tables = json.loads( vm_util.IssueCommand([ 'aws', 'ec2', 'describe-route-tables', '--filters', f'Name=vpc-id,Values={vpc_id}', '--region', self.region, ])[0] )['RouteTables'] for table in route_tables: if not table['Associations'][0]['Main']: resources_to_tag.append(table['RouteTableId']) nat = json.loads( vm_util.IssueCommand([ 'aws', 'ec2', 'describe-nat-gateways', '--filter', f'Name=vpc-id,Values={vpc_id}', '--region', self.region, ])[0] )['NatGateways'][0] resources_to_tag.append(nat['NatGatewayId']) resources_to_tag.append(nat['NatGatewayAddresses'][0]['NetworkInterfaceId']) for resource_to_tag in resources_to_tag: util.AddDefaultTags(resource_to_tag, self.region) self._RenderClusterConfig() def _DeleteDependencies(self): """Deletes vpc and cloudformation template.""" self._key_manager.DeleteKeyfile(self.region) vm_util.IssueCommand([ 'aws', 'cloudformation', 'delete-stack', '--stack-name', self._network_stack, '--region', self.region, ]) # VPC is created outside of PKB scope as part of pcluster intiailization. # But the tool does not take care of vpc cleanup. self._vpc._Delete() # pylint: disable=protected-access def _RenderClusterConfig(self): """Render the config file that will be used to create the cluster.""" tags = util.MakeDefaultTags(FLAGS.timeout_minutes) with open(data.ResourcePath(self.template)) as content: template = jinja2.Template( content.read(), undefined=jinja2.StrictUndefined ) self._config = template.render( name=self.name, os_type=self.os_type.replace('amazonlinux', 'alinux'), region=self.region, num_workers=self.num_workers, worker_machine_type=self.worker_machine_type, headnode_machine_type=self.headnode_machine_type, headnode_subnet_id=self._headnode_subnet_id, worker_subnet_id=self._worker_subnet_id, ssh_key=self._key_manager.GetKeyNameForRun(), tags=tags, # boot disk of headnode is also mounted as NFS nfs_size=self.headnode_spec.boot_disk_size, efa_enabled=FLAGS.aws_efa, enable_spot_vm=FLAGS.aws_spot_instances, # Expose enable_smt flag for consistency across clouds. enable_smt=not FLAGS.disable_smt, ) def _Create(self): with open(self._config_path, 'w') as config_file: config_file.write(self._config) vm_util.IssueCommand([ flags.PCLUSTER_PATH.value, 'create-cluster', '--cluster-configuration', # Create actual cluster with rendered configuration. self._config_path, '--cluster-name', self.name, ]) @vm_util.Retry( poll_interval=1, log_errors=False, retryable_exceptions=(aws_virtual_machine.AwsUnknownStatusError,), ) def _WaitUntilRunning(self): """Waits until the cluster is (or was) running.""" stdout, _, _ = vm_util.IssueCommand([ flags.PCLUSTER_PATH.value, 'describe-cluster', '--cluster-name', self.name, '--region', self.region, ]) status = json.loads(stdout)['clusterStatus'] if status != 'CREATE_COMPLETE': raise aws_virtual_machine.AwsUnknownStatusError( f'Unknown status: {status}; retrying describe-instances command' ) def _PostCreate(self): """Backfill VM object after cluster creation.""" self.worker_vms = [] stdout, _, _ = vm_util.IssueCommand([ flags.PCLUSTER_PATH.value, 'describe-cluster-instances', '--cluster-name', self.name, '--region', self.region, ]) def _PopulateVM(vm, instance): """Extract VM id/ips.""" vm.id = instance.get('instanceId') vm.created = True vm.ip_address = instance.get('publicIpAddress', None) vm.internal_ips = [instance.get('privateIpAddress')] vm.internal_ip = instance.get('privateIpAddress') # AWS VM class does a lot more than just parsing describe output # and backfill ip addresses. # E.g. # - installing & configuring EFA (not needed if created with pcluster) # - wait & configure public IP (not available for workers) # Create headnode object to avoid race condition for instance in json.loads(stdout)['instances']: if instance['nodeType'] == 'HeadNode': self.headnode_vm = self.InstantiateVm(self.headnode_spec) _PopulateVM(self.headnode_vm, instance) for instance in json.loads(stdout)['instances']: if instance['nodeType'] == 'HeadNode': continue vm = self.InstantiateVm(self.workers_spec) self.worker_vms.append(vm) _PopulateVM(vm, instance) vm.ip_address = vm.internal_ips[0] vm.proxy_jump = self.headnode_vm.name self.vms = [self.headnode_vm] + self.worker_vms self.headnode_vm.network.regional_network._reference_count += 1 vm_util.GenerateSSHConfig( self.vms, {'headnode': [self.headnode_vm], 'worker': self.worker_vms} ) def _SetupEnvironment(vm): vm.WaitForBootCompletion() vm.RemoteCommand( f'sudo ln -s {self.nfs_path} {linux_packages.INSTALL_DIR}' ) vm.PrepareVMEnvironment() # Since worker uses headnode as proxy, setup headnode to avoid potential # race condition. _SetupEnvironment(self.headnode_vm) background_tasks.RunThreaded(_SetupEnvironment, self.worker_vms) def _Delete(self): vm_util.IssueCommand([ flags.PCLUSTER_PATH.value, 'delete-cluster', '--cluster-name', self.name, '--region', self.region, ]) def AuthenticateVM(self): """Authenticate a remote machine to access all vms.""" # Already taken care of by pcluster pass