perfkitbenchmarker/providers/aws/aws_network.py (751 lines of code) (raw):
# Copyright 2015 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module containing classes related to AWS VM networking.
The Firewall class provides a way of opening VM ports. The Network class allows
VMs to communicate via internal ips and isolates PerfKitBenchmarker VMs from
others in
the same project. See https://aws.amazon.com/documentation/vpc/
for more information about AWS Virtual Private Clouds.
"""
import json
import logging
import threading
from absl import flags
from perfkitbenchmarker import context
from perfkitbenchmarker import errors
from perfkitbenchmarker import network
from perfkitbenchmarker import placement_group
from perfkitbenchmarker import provider_info
from perfkitbenchmarker import resource
from perfkitbenchmarker import vm_util
from perfkitbenchmarker.providers.aws import aws_placement_group
from perfkitbenchmarker.providers.aws import aws_vpc_endpoint
from perfkitbenchmarker.providers.aws import util
_AWS_VPC = flags.DEFINE_string(
'aws_vpc',
None,
'The static AWS VPC id to use. If unset, creates a new VPC.',
)
_AWS_SUBNET = flags.DEFINE_string(
'aws_subnet',
None,
'The static AWS subnet id to use. Set value to "default" to use '
'default subnet. If unset, creates a new subnet.',
)
AWS_ENI_COUNT = flags.DEFINE_integer(
'aws_eni_count', 1, 'The number of ENIs per instance.'
)
AWS_NETWORK_CARD_COUNT = flags.DEFINE_integer(
'aws_network_card_count', 1, 'The number of network cards per instance.'
)
flags.DEFINE_bool('aws_efa', False, 'Whether to use an Elastic Fiber Adapter.')
flags.DEFINE_string(
'aws_efa_version',
'1.12.1',
'Version of AWS EFA to use (must also pass in --aws_efa).',
)
flags.DEFINE_integer('aws_efa_count', 1, 'The number of EFAs per instance.')
flags.DEFINE_multi_enum(
'aws_endpoint', [], ['s3'], 'List of AWS endpoints to create'
)
FLAGS = flags.FLAGS
REGION = 'region'
ZONE = 'zone'
NON_PLACEMENT_GROUP_PREFIXES = frozenset(
['t2', 'm3', 't3', 't3a', 't4g', 'vt1', 'm7i-flex']
)
# Reference -
# https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-eni.html#network-cards
DUAL_NETWORK_CARD_MACHINES = frozenset([
'c6in.32xlarge',
'c6in.metal',
'dl1.24xlarge',
'hpc6id.32xlarge',
'hpc7a.12xlarge',
'hpc7a.24xlarge',
'hpc7a.48xlarge',
'hpc7a.96xlarge',
'm6idn.32xlarge',
'm6idn.metal',
'm6in.32xlarge',
'm6in.metal',
'p4d.24xlarge',
'p4de.24xlarge',
'p5.48xlarge',
'p5en.48xlarge',
'r6idn.32xlarge',
'r6idn.metal',
'r6in.32xlarge',
'r6in.metal',
'trn1.32xlarge',
'trn1n.32xlarge',
])
class AwsFirewall(network.BaseFirewall):
"""An object representing the AWS Firewall."""
CLOUD = provider_info.AWS
def __init__(self):
self.firewall_set = set()
self.firewall_icmp_set = set()
self._lock = threading.Lock()
def AllowIcmp(self, vm):
"""Opens the ICMP protocol on the firewall.
Args:
vm: The BaseVirtualMachine object to open the ICMP protocol for.
"""
source = '0.0.0.0/0'
# region, group_id, source
entry = (vm.region, vm.group_id, source)
with self._lock:
if entry in self.firewall_icmp_set:
return
# When defining ICMP firewall rules using the aws cli,
# port specifies the type of ICMP traffic allowed,
# with -1 meaning all ICMP types
# https://docs.aws.amazon.com/cli/latest/reference/ec2/authorize-security-group-ingress.html
authorize_cmd = util.AWS_PREFIX + [
'ec2',
'authorize-security-group-ingress',
'--region=%s' % vm.region,
'--group-id=%s' % vm.group_id,
'--protocol=icmp',
'--port=-1',
'--cidr=%s' % source,
]
util.IssueRetryableCommand(authorize_cmd)
self.firewall_icmp_set.add(entry)
def AllowPort(self, vm, start_port, end_port=None, source_range=None):
"""Opens a port on the firewall.
Args:
vm: The BaseVirtualMachine object to open the port for.
start_port: The first local port to open in a range.
end_port: The last local port to open in a range. If None, only start_port
will be opened.
source_range: List of source CIDRs to allow for this port. If None, all
sources are allowed. i.e. ['0.0.0.0/0']
"""
if vm.is_static or vm.network.is_static:
return
self.AllowPortInSecurityGroup(
vm.region, vm.group_id, start_port, end_port, source_range
)
def AllowPortInSecurityGroup(
self, region, security_group, start_port, end_port=None, source_range=None
):
"""Opens a port on the firewall for a security group.
Args:
region: The region of the security group
security_group: The security group in which to open the ports
start_port: The first local port to open in a range.
end_port: The last local port to open in a range. If None, only start_port
will be opened.
source_range: List of source CIDRs to allow for this port.
"""
end_port = end_port or start_port
source_range = source_range or ['0.0.0.0/0']
for source in source_range:
entry = (start_port, end_port, region, security_group, source)
if entry in self.firewall_set:
continue
if self._RuleExists(region, security_group, start_port, end_port, source):
self.firewall_set.add(entry)
continue
with self._lock:
if entry in self.firewall_set:
continue
authorize_cmd = util.AWS_PREFIX + [
'ec2',
'authorize-security-group-ingress',
'--region=%s' % region,
'--group-id=%s' % security_group,
'--port=%s-%s' % (start_port, end_port),
'--cidr=%s' % source,
]
util.IssueRetryableCommand(authorize_cmd + ['--protocol=tcp'])
util.IssueRetryableCommand(authorize_cmd + ['--protocol=udp'])
self.firewall_set.add(entry)
def _RuleExists(self, region, security_group, start_port, end_port, source):
"""Whether the firewall rule exists in the VPC."""
query_cmd = util.AWS_PREFIX + [
'ec2',
'describe-security-groups',
'--region=%s' % region,
'--group-ids=%s' % security_group,
'--filters',
'Name=ip-permission.cidr,Values={}'.format(source),
'Name=ip-permission.from-port,Values={}'.format(start_port),
'Name=ip-permission.to-port,Values={}'.format(end_port),
]
stdout, _ = util.IssueRetryableCommand(query_cmd)
# "groups" will be an array of all the matching firewall rules
groups = json.loads(stdout)['SecurityGroups']
return bool(groups)
def DisallowAllPorts(self):
"""Closes all ports on the firewall."""
pass
class AwsVpc(resource.BaseResource):
"""An object representing an Aws VPC."""
def __init__(self, region, vpc_id=None, regional_network_index=0):
super().__init__(vpc_id is not None)
self.region = region
self.regional_network_index = regional_network_index
self.cidr = network.GetCidrBlock(self.regional_network_index, 0, 16)
self.id = vpc_id
# Subnets are assigned per-AZ.
# _subnet_index tracks the next unused 10.x.y.0/24 block.
self._subnet_index = 0
# Lock protecting _subnet_index
self._subnet_index_lock = threading.Lock()
self.default_security_group_id = None
if self.id:
self._SetSecurityGroupId()
self._endpoints = [
aws_vpc_endpoint.CreateEndpointService(service, self)
for service in set(FLAGS.aws_endpoint)
]
def _Create(self):
"""Creates the VPC."""
create_cmd = util.AWS_PREFIX + [
'ec2',
'create-vpc',
'--region=%s' % self.region,
'--cidr-block=%s' % self.cidr,
]
stdout, stderr, retcode = vm_util.IssueCommand(
create_cmd, raise_on_failure=False
)
if 'VpcLimitExceeded' in stderr:
raise errors.Benchmarks.QuotaFailure(stderr)
if retcode:
raise errors.Resource.CreationError(
'Failed to create Vpc: %s return code: %s' % (retcode, stderr)
)
response = json.loads(stdout)
self.id = response['Vpc']['VpcId']
self._EnableDnsHostnames()
util.AddDefaultTags(self.id, self.region)
def _PostCreate(self):
self._SetSecurityGroupId()
for endpoint in self._endpoints:
endpoint.Create()
def _SetSecurityGroupId(self):
"""Looks up the VPC default security group."""
groups = self.GetSecurityGroups('default')
if len(groups) != 1:
raise ValueError(
'Expected one security group, got {} in {}'.format(
len(groups), groups
)
)
self.default_security_group_id = groups[0]['GroupId']
logging.info(
'Default security group ID: %s', self.default_security_group_id
)
if FLAGS.aws_efa:
self._AllowSelfOutBound()
def GetSecurityGroups(self, group_name=None):
cmd = util.AWS_PREFIX + [
'ec2',
'describe-security-groups',
'--region',
self.region,
'--filters',
'Name=vpc-id,Values=' + self.id,
]
if group_name:
cmd.append('Name=group-name,Values={}'.format(group_name))
stdout, _, _ = vm_util.IssueCommand(cmd)
return json.loads(stdout)['SecurityGroups']
def _Exists(self):
"""Returns true if the VPC exists."""
describe_cmd = util.AWS_PREFIX + [
'ec2',
'describe-vpcs',
'--region=%s' % self.region,
'--filter=Name=vpc-id,Values=%s' % self.id,
]
stdout, _ = util.IssueRetryableCommand(describe_cmd)
response = json.loads(stdout)
vpcs = response['Vpcs']
assert len(vpcs) < 2, 'Too many VPCs.'
return len(vpcs) > 0
def _EnableDnsHostnames(self):
"""Sets the enableDnsHostnames attribute of this VPC to True.
By default, instances launched in non-default VPCs are assigned an
unresolvable hostname. This breaks the hadoop benchmark. Setting the
enableDnsHostnames attribute to 'true' on the VPC resolves this. See:
http://docs.aws.amazon.com/AmazonVPC/latest/UserGuide/VPC_DHCP_Options.html
"""
enable_hostnames_command = util.AWS_PREFIX + [
'ec2',
'modify-vpc-attribute',
'--region=%s' % self.region,
'--vpc-id',
self.id,
'--enable-dns-hostnames',
'{ "Value": true }',
]
util.IssueRetryableCommand(enable_hostnames_command)
def _PreDelete(self):
"""See base class.
Deletes the AWS endpoints if created.
"""
for endpoint in self._endpoints:
endpoint.Delete()
def _Delete(self):
"""Deletes the VPC."""
delete_cmd = util.AWS_PREFIX + [
'ec2',
'delete-vpc',
'--region=%s' % self.region,
'--vpc-id=%s' % self.id,
]
vm_util.IssueCommand(delete_cmd, raise_on_failure=False)
def NextSubnetCidrBlock(self):
"""Returns the next available /24 CIDR block in this VPC.
Each VPC has a 10.0.0.0/16 CIDR block.
Each subnet is assigned a /24 within this allocation.
Calls to this method return the next unused /24.
Returns:
A string representing the next available /24 block, in CIDR notation.
Raises:
ValueError: when no additional subnets can be created.
"""
with self._subnet_index_lock:
if self._subnet_index >= (1 << 8) - 1:
raise ValueError(
'Exceeded subnet limit ({}).'.format(self._subnet_index)
)
cidr = network.GetCidrBlock(
self.regional_network_index, self._subnet_index
)
self._subnet_index += 1
return cidr
@vm_util.Retry()
def _AllowSelfOutBound(self):
"""Allow outbound connections on all ports in the default security group.
Details: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/efa-start.html
"""
cmd = util.AWS_PREFIX + [
'ec2',
'authorize-security-group-egress',
'--region',
self.region,
'--group-id',
self.default_security_group_id,
'--protocol',
'all',
'--source-group',
self.default_security_group_id,
]
try:
vm_util.IssueCommand(cmd)
except errors.VmUtil.IssueCommandError as ex:
# do not retry if this rule already exists
if ex.message.find('InvalidPermission.Duplicate') == -1: # pytype: disable=attribute-error # enable-nested-classes
raise ex
def AllowVpcPeerInBound(self, peer_vpc):
"""Allow inbound connections on all ports in the default security group from peer vpc.
Args:
peer_vpc: AwsVpc. Peer vpc to allow inbound traffic from.
"""
cmd = util.AWS_PREFIX + [
'ec2',
'authorize-security-group-ingress',
'--region=%s' % self.region,
'--group-id=%s' % self.default_security_group_id,
'--protocol=%s' % 'all',
'--cidr=%s' % peer_vpc.cidr,
]
vm_util.IssueRetryableCommand(cmd)
class AwsSubnet(resource.BaseResource):
"""An object representing an Aws subnet."""
def __init__(self, zone, vpc_id, cidr_block='10.0.0.0/24', subnet_id=None):
super().__init__(subnet_id is not None)
self.zone = zone
self.region = util.GetRegionFromZone(zone)
self.vpc_id = vpc_id
self.id = subnet_id
self.cidr_block = cidr_block
def _Create(self):
"""Creates the subnet."""
create_cmd = util.AWS_PREFIX + [
'ec2',
'create-subnet',
'--region=%s' % self.region,
'--vpc-id=%s' % self.vpc_id,
'--cidr-block=%s' % self.cidr_block,
]
if not util.IsRegion(self.zone):
create_cmd.append('--availability-zone=%s' % self.zone)
stdout, _, _ = vm_util.IssueCommand(create_cmd)
response = json.loads(stdout)
self.id = response['Subnet']['SubnetId']
util.AddDefaultTags(self.id, self.region)
def _Delete(self):
"""Deletes the subnet."""
logging.info(
'Deleting subnet %s. This may fail if all instances in the '
'subnet have not completed termination, but will be retried.',
self.id,
)
delete_cmd = util.AWS_PREFIX + [
'ec2',
'delete-subnet',
'--region=%s' % self.region,
'--subnet-id=%s' % self.id,
]
vm_util.IssueCommand(delete_cmd, raise_on_failure=False)
def _Exists(self):
"""Returns true if the subnet exists."""
return bool(self.GetDict())
def GetDict(self):
"""The 'aws ec2 describe-subnets' for this VPC / subnet id.
Returns:
A dict of the single subnet or an empty dict if there are no subnets.
Raises:
AssertionError: If there is more than one subnet.
"""
describe_cmd = util.AWS_PREFIX + [
'ec2',
'describe-subnets',
'--region=%s' % self.region,
'--filter=Name=vpc-id,Values=%s' % self.vpc_id,
]
if self.id:
describe_cmd.append('--filter=Name=subnet-id,Values=%s' % self.id)
stdout, _ = util.IssueRetryableCommand(describe_cmd)
response = json.loads(stdout)
subnets = response['Subnets']
assert len(subnets) < 2, 'Too many subnets.'
return subnets[0] if subnets else {}
class AwsInternetGateway(resource.BaseResource):
"""An object representing an Aws Internet Gateway."""
def __init__(self, region, vpc_id=None):
super().__init__(vpc_id is not None)
self.region = region
self.vpc_id = None
self.id = None
self.attached = False
if vpc_id:
self.vpc_id = vpc_id
self.id = self.GetDict().get('InternetGatewayId')
# if a gateway was found then it is attached to this VPC
self.attached = bool(self.id)
def _Create(self):
"""Creates the internet gateway."""
create_cmd = util.AWS_PREFIX + [
'ec2',
'create-internet-gateway',
'--region=%s' % self.region,
]
stdout, _, _ = vm_util.IssueCommand(create_cmd)
response = json.loads(stdout)
self.id = response['InternetGateway']['InternetGatewayId']
util.AddDefaultTags(self.id, self.region)
def _Delete(self):
"""Deletes the internet gateway."""
delete_cmd = util.AWS_PREFIX + [
'ec2',
'delete-internet-gateway',
'--region=%s' % self.region,
'--internet-gateway-id=%s' % self.id,
]
vm_util.IssueCommand(delete_cmd, raise_on_failure=False)
def _Exists(self):
"""Returns true if the internet gateway exists."""
return bool(self.GetDict())
def GetDict(self):
"""The 'aws ec2 describe-internet-gateways' for this VPC / gateway id.
Returns:
A dict of the single gateway or an empty dict if there are no gateways.
Raises:
AssertionError: If there is more than one internet gateway.
"""
describe_cmd = util.AWS_PREFIX + [
'ec2',
'describe-internet-gateways',
'--region=%s' % self.region,
]
if self.id:
describe_cmd.append(
'--filter=Name=internet-gateway-id,Values=%s' % self.id
)
elif self.vpc_id:
# Only query with self.vpc_id if the self.id is NOT set -- after calling
# Detach() this object will set still have a vpc_id but will be filtered
# out in a query if using attachment.vpc-id.
# Using self.vpc_id instead of self.attached as the init phase always
# sets it to False.
describe_cmd.append(
'--filter=Name=attachment.vpc-id,Values=%s' % self.vpc_id
)
else:
raise errors.Error('Must have a VPC id or a gateway id')
stdout, _ = util.IssueRetryableCommand(describe_cmd)
response = json.loads(stdout)
internet_gateways = response['InternetGateways']
assert len(internet_gateways) < 2, 'Too many internet gateways.'
return internet_gateways[0] if internet_gateways else {}
def Attach(self, vpc_id):
"""Attaches the internet gateway to the VPC."""
if not self.attached:
self.vpc_id = vpc_id
attach_cmd = util.AWS_PREFIX + [
'ec2',
'attach-internet-gateway',
'--region=%s' % self.region,
'--internet-gateway-id=%s' % self.id,
'--vpc-id=%s' % self.vpc_id,
]
util.IssueRetryableCommand(attach_cmd)
self.attached = True
def Detach(self):
"""Detaches the internet gateway from the VPC."""
def _suppress_failure(stdout, stderr, retcode):
"""Suppresses Detach failure when internet gateway is in a bad state."""
del stdout # unused
if retcode and (
'InvalidInternetGatewayID.NotFound' in stderr
or 'Gateway.NotAttached' in stderr
):
return True
return False
if self.attached and not self.user_managed:
detach_cmd = util.AWS_PREFIX + [
'ec2',
'detach-internet-gateway',
'--region=%s' % self.region,
'--internet-gateway-id=%s' % self.id,
'--vpc-id=%s' % self.vpc_id,
]
util.IssueRetryableCommand(detach_cmd, suppress_failure=_suppress_failure)
self.attached = False
class AwsRouteTable(resource.BaseResource):
"""An object representing a route table."""
def __init__(self, region, vpc_id):
super().__init__()
self.region = region
self.vpc_id = vpc_id
self.id: str = None # set by _PostCreate
def _Create(self):
"""Creates the route table.
This is a no-op since every VPC has a default route table.
"""
pass
def _Delete(self):
"""Deletes the route table.
This is a no-op since the default route table gets deleted with the VPC.
"""
pass
@vm_util.Retry()
def _PostCreate(self):
"""Gets data about the route table."""
self.id = self.GetDict()[0]['RouteTableId']
def GetDict(self):
"""Returns an array of the currently existing routes for this VPC."""
describe_cmd = util.AWS_PREFIX + [
'ec2',
'describe-route-tables',
'--region=%s' % self.region,
'--filters=Name=vpc-id,Values=%s' % self.vpc_id,
]
stdout, _ = util.IssueRetryableCommand(describe_cmd)
return json.loads(stdout)['RouteTables']
def RouteExists(self):
"""Returns true if the 0.0.0.0/0 route already exists."""
route_tables = self.GetDict()
if not route_tables:
return False
for route in route_tables[0].get('Routes', []):
if route.get('DestinationCidrBlock') == '0.0.0.0/0':
return True
return False
def CreateRoute(self, internet_gateway_id):
"""Adds a route to the internet gateway."""
if self.RouteExists():
logging.info('Internet route already exists.')
return
create_cmd = util.AWS_PREFIX + [
'ec2',
'create-route',
'--region=%s' % self.region,
'--route-table-id=%s' % self.id,
'--gateway-id=%s' % internet_gateway_id,
'--destination-cidr-block=0.0.0.0/0',
]
util.IssueRetryableCommand(create_cmd)
def CreateVpcPeeringRoute(self, vpc_peering_id, destination_cidr):
"""Adds a route to peer VPC."""
create_cmd = util.AWS_PREFIX + [
'ec2',
'create-route',
'--region=%s' % self.region,
'--route-table-id=%s' % self.id,
'--vpc-peering-connection-id=%s' % vpc_peering_id,
'--destination-cidr-block=%s' % destination_cidr,
]
util.IssueRetryableCommand(create_cmd)
class _AwsRegionalNetwork(network.BaseNetwork):
"""Object representing regional components of an AWS network.
The benchmark spec contains one instance of this class per region, which an
AwsNetwork may retrieve or create via _AwsRegionalNetwork.GetForRegion.
Attributes:
region: string. The AWS region.
vpc: an AwsVpc instance.
internet_gateway: an AwsInternetGateway instance.
route_table: an AwsRouteTable instance. The default route table.
"""
_regional_network_count = 0
_regional_network_lock = threading.Lock()
CLOUD = provider_info.AWS
def __repr__(self):
return '%s(%r)' % (self.__class__, self.__dict__)
def __init__(self, region, vpc_id=None):
self.region = region
self.internet_gateway = AwsInternetGateway(region, vpc_id)
self.route_table = None
self.created = False
# Locks to ensure that a single thread creates / deletes the instance.
self._create_lock = threading.Lock()
# Tracks the number of AwsNetworks using this _AwsRegionalNetwork.
# Incremented by Create(); decremented by Delete();
# When a Delete() call decrements _reference_count to 0, the RegionalNetwork
# is destroyed.
self._reference_count = 0
self._reference_count_lock = threading.Lock()
# Each regional network needs unique cidr_block for VPC peering.
with _AwsRegionalNetwork._regional_network_lock:
self.vpc = AwsVpc(
self.region, vpc_id, _AwsRegionalNetwork._regional_network_count
)
self.cidr_block = network.GetCidrBlock(
_AwsRegionalNetwork._regional_network_count
)
_AwsRegionalNetwork._regional_network_count += 1
@classmethod
def GetForRegion(cls, region, vpc_id=None):
"""Retrieves or creates an _AwsRegionalNetwork.
Args:
region: string. AWS region name.
vpc_id: string. AWS VPC id.
Returns:
_AwsRegionalNetwork. If an _AwsRegionalNetwork for the same region already
exists in the benchmark spec, that instance is returned. Otherwise, a new
_AwsRegionalNetwork is created and returned.
"""
benchmark_spec = context.GetThreadBenchmarkSpec()
if benchmark_spec is None:
raise errors.Error(
'GetNetwork called in a thread without a BenchmarkSpec.'
)
key = cls.CLOUD, REGION, region
# Because this method is only called from the AwsNetwork constructor, which
# is only called from AwsNetwork.GetNetwork, we already hold the
# benchmark_spec.networks_lock.
if key not in benchmark_spec.regional_networks:
benchmark_spec.regional_networks[key] = cls(region, vpc_id)
return benchmark_spec.regional_networks[key]
def Create(self):
"""Creates the network."""
with self._reference_count_lock:
assert self._reference_count >= 0, self._reference_count
self._reference_count += 1
# Access here must be synchronized. The first time the block is executed,
# the network will be created. Subsequent attempts to create the
# network block until the initial attempt completes, then return.
with self._create_lock:
if self.created:
return
self.vpc.Create()
self.internet_gateway.Create()
self.internet_gateway.Attach(self.vpc.id)
if self.route_table is None:
self.route_table = AwsRouteTable(self.region, self.vpc.id)
self.route_table.Create()
self.route_table.CreateRoute(self.internet_gateway.id)
self.created = True
def Delete(self):
"""Deletes the network."""
# Only actually delete if there are no more references.
with self._reference_count_lock:
assert self._reference_count >= 1, self._reference_count
self._reference_count -= 1
if self._reference_count:
return
if self.created:
self.internet_gateway.Detach()
self.internet_gateway.Delete()
self.vpc.Delete()
class AwsNetworkSpec(network.BaseNetworkSpec):
"""Configuration for creating an AWS network."""
def __init__(self, zone, vpc_id=None, subnet_id=None, machine_type=None):
super().__init__(zone)
self.machine_type = machine_type
if vpc_id or subnet_id:
logging.info(
'Confirming vpc (%s) and subnet (%s) selections', vpc_id, subnet_id
)
my_subnet = AwsSubnet(self.zone, vpc_id, subnet_id=subnet_id).GetDict()
self.vpc_id = my_subnet['VpcId']
self.subnet_id = my_subnet['SubnetId']
self.cidr_block = my_subnet['CidrBlock']
logging.info(
'Using vpc %s subnet %s cidr %s',
self.vpc_id,
self.subnet_id,
self.cidr_block,
)
else:
self.vpc_id = None
self.subnet_id = None
self.cidr_block = None
def _get_default_vpc_id(region: str) -> str:
"""Returns the default VPC ID for the region.
Creates a default VPC if one did not exist previously
Args:
region: Region of the default VPC.
Returns: Default VPC ID
Raises:
UnsupportedConfigError: When default VPC does not exist and cannot be
created.
"""
vpc_cmd = util.AWS_PREFIX + [
'ec2',
'describe-vpcs',
'--region',
region,
'--filters',
'Name=isDefault,Values=true',
]
stdout, _ = vm_util.IssueRetryableCommand(vpc_cmd)
vpcs = json.loads(stdout)['Vpcs']
if vpcs:
return vpcs[0]['VpcId']
create_cmd = util.AWS_PREFIX + [
'ec2',
'create-default-vpc',
'--region',
region,
]
stdout, _, ret = vm_util.IssueCommand(create_cmd, raise_on_failure=False)
if ret:
raise errors.Benchmarks.UnsupportedConfigError(
f'AWS default VPC does not exist for region {region}.'
)
return json.loads(stdout)['Vpc']['VpcId']
def _get_default_subnet_id(zone: str) -> str:
"""Returns the default subnet ID for the zone.
Creates a default subnet if one did not exist previously
Args:
zone: Zone of the default subnet.
Returns: Default Subnet ID
Raises:
UnsupportedConfigError: When default subnet does not exist and cannot be
created.
"""
region = util.GetRegionFromZone(zone)
subnet_cmd = util.AWS_PREFIX + [
'ec2',
'describe-subnets',
'--region',
region,
'--filter',
f'Name=availabilityZone,Values={zone}',
'Name=defaultForAz,Values=true',
]
stdout, _ = vm_util.IssueRetryableCommand(subnet_cmd)
subnets = json.loads(stdout)['Subnets']
if subnets:
return subnets[0]['SubnetId']
create_cmd = util.AWS_PREFIX + [
'ec2',
'create-default-subnet',
'--region',
region,
'--availability-zone',
zone,
]
stdout, _, ret = vm_util.IssueCommand(create_cmd, raise_on_failure=False)
if ret:
raise errors.Benchmarks.UnsupportedConfigError(
f'AWS default subnet does not exist for zone {zone}.'
)
return json.loads(stdout)['Subnet']['SubnetId']
def _is_placement_group_compatible(machine_type):
"""Returns True if VMs of 'machine_type' can be put in a placement group."""
prefix = machine_type.split('.')[0]
return prefix not in NON_PLACEMENT_GROUP_PREFIXES
class AwsNetwork(network.BaseNetwork):
"""Object representing an AWS Network.
Attributes:
region: The AWS region the Network is in.
regional_network: The AwsRegionalNetwork for 'region'.
subnet: the AwsSubnet for this zone.
placement_group: An AwsPlacementGroup instance.
"""
CLOUD = provider_info.AWS
def __repr__(self):
return '%s(%r)' % (self.__class__, self.__dict__)
def __init__(self, spec):
"""Initializes AwsNetwork instances.
Args:
spec: An AwsNetworkSpec object.
"""
super().__init__(spec)
self.region = util.GetRegionFromZone(spec.zone)
self.regional_network = _AwsRegionalNetwork.GetForRegion(
self.region, spec.vpc_id
)
self.subnet = None
self.vpc_peering = None
# Placement Group
no_placement_group = (
not FLAGS.placement_group_style
or FLAGS.placement_group_style == placement_group.PLACEMENT_GROUP_NONE
)
has_optional_pg = (
FLAGS.placement_group_style
== placement_group.PLACEMENT_GROUP_CLOSEST_SUPPORTED
)
if no_placement_group:
self.placement_group = None
elif has_optional_pg and not _is_placement_group_compatible(
spec.machine_type
):
logging.warning(
'machine type %s does not support placement groups. '
'Placement group style set to none.',
spec.machine_type,
)
self.placement_group = None
elif has_optional_pg and len(set(FLAGS.zone)) > 1:
logging.warning(
'inter-zone/inter-region tests do not support placement groups. '
'Placement group style set to none.'
)
self.placement_group = None
elif not _is_placement_group_compatible(spec.machine_type):
raise errors.Benchmarks.UnsupportedConfigError(
f'machine type {spec.machine_type} does not support '
'placement groups. Use placement group style none.'
)
elif len(set(FLAGS.zone)) > 1:
raise errors.Benchmarks.UnsupportedConfigError(
'inter-zone/inter-region tests do not support placement groups. '
'Use placement group style closest_supported.'
)
else:
placement_group_spec = aws_placement_group.AwsPlacementGroupSpec(
'AwsPlacementGroupSpec', flag_values=FLAGS, zone=spec.zone
)
self.placement_group = aws_placement_group.AwsPlacementGroup(
placement_group_spec
)
self.is_static = False
if spec.vpc_id:
self.is_static = True
self.subnet = AwsSubnet(
self.zone,
spec.vpc_id,
cidr_block=self.regional_network.cidr_block,
subnet_id=spec.subnet_id,
)
@staticmethod
def _GetNetworkSpecFromVm(vm):
"""Returns an AwsNetworkSpec created from VM attributes and flags."""
if _AWS_SUBNET.value == 'default':
vpc_id = _get_default_vpc_id(vm.region)
subnet_id = _get_default_subnet_id(vm.zone)
else:
vpc_id = _AWS_VPC.value
subnet_id = _AWS_SUBNET.value
return AwsNetworkSpec(vm.zone, vpc_id, subnet_id, vm.machine_type)
def Create(self):
"""Creates the network."""
self.regional_network.Create()
if self.subnet is None:
cidr = self.regional_network.vpc.NextSubnetCidrBlock()
self.subnet = AwsSubnet(
self.zone, self.regional_network.vpc.id, cidr_block=cidr
)
self.subnet.Create()
if self.placement_group:
self.placement_group.Create()
def Delete(self):
"""Deletes the network."""
if self.subnet:
self.subnet.Delete()
if self.placement_group:
self.placement_group.Delete()
if hasattr(self, 'vpc_peering') and self.vpc_peering:
self.vpc_peering.Delete()
self.regional_network.Delete()
def Peer(self, peering_network):
"""Peers the network with the peering_network.
This method is used for VPC peering. It will connect 2 VPCs together.
Args:
peering_network: BaseNetwork. The network to peer with.
"""
# Skip Peering if the networks are the same
if self.regional_network is peering_network.regional_network:
return
spec = network.BaseVPCPeeringSpec(
self.regional_network, peering_network.regional_network
)
self.vpc_peering = AwsVpcPeering(spec)
peering_network.vpc_peering = self.vpc_peering
self.vpc_peering.Create()
@classmethod
def _GetKeyFromNetworkSpec(cls, spec):
"""Returns a key used to register Network instances."""
return (cls.CLOUD, ZONE, spec.zone)
class AwsVpcPeering(network.BaseVPCPeering):
"""Object containing all information needed to create a VPC Peering Object."""
def _Create(self):
"""Creates the peering object.
Documentation on creating a vpc object:
https://docs.aws.amazon.com/vpc/latest/peering/vpc-pg.pdf
"""
# Creates Peering Connection
create_cmd = util.AWS_PREFIX + [
'ec2',
'create-vpc-peering-connection',
'--region=%s' % self.network_a.region,
'--peer-region=%s' % self.network_b.region,
'--vpc-id=%s' % self.network_a.vpc.id,
'--peer-vpc-id=%s' % self.network_b.vpc.id,
]
stdout, _ = vm_util.IssueRetryableCommand(create_cmd)
response = json.loads(stdout)
self.id = response['VpcPeeringConnection']['VpcPeeringConnectionId']
# Accepts Peering Connection
accept_cmd = util.AWS_PREFIX + [
'ec2',
'accept-vpc-peering-connection',
'--region=%s' % self.network_b.region,
'--vpc-peering-connection-id=%s' % self.id,
]
vm_util.IssueRetryableCommand(accept_cmd)
util.AddDefaultTags(self.id, self.network_a.region)
logging.info(
'Creating VPC peering between %s and %s',
self.network_a.vpc.cidr,
self.network_b.vpc.cidr,
)
# Adds VPC peering to both networks' route tables
self.network_a.route_table.CreateVpcPeeringRoute(
self.id, self.network_b.vpc.cidr
)
self.network_b.route_table.CreateVpcPeeringRoute(
self.id, self.network_a.vpc.cidr
)
# Updates security group to allow inbound traffic from peering networks
self.network_a.vpc.AllowVpcPeerInBound(self.network_b.vpc)
self.network_b.vpc.AllowVpcPeerInBound(self.network_a.vpc)
def _Delete(self):
"""Creates the deletes the peering object."""
delete_cmd = util.AWS_PREFIX + [
'ec2',
'delete-vpc-peering-connection',
'--region=%s' % self.network_a.region,
'--vpc-peering-connection-id=%s' % self.id,
]
vm_util.IssueCommand(delete_cmd)