perfkitbenchmarker/providers/azure/azure_network.py (658 lines of code) (raw):

# Copyright 2014 PerfKitBenchmarker Authors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Module containing classes related to Azure VM networking. The Firewall class provides a way of opening VM ports. The Network class allows VMs to communicate via internal ips and isolates PerfKitBenchmarker VMs from others in the same project. See http://msdn.microsoft.com/library/azure/jj156007.aspx for more information about Azure Virtual Networks. """ import json import logging import threading from absl import flags from perfkitbenchmarker import context from perfkitbenchmarker import errors from perfkitbenchmarker import network from perfkitbenchmarker import placement_group from perfkitbenchmarker import provider_info from perfkitbenchmarker import resource from perfkitbenchmarker import vm_util from perfkitbenchmarker.providers import azure from perfkitbenchmarker.providers.azure import azure_placement_group from perfkitbenchmarker.providers.azure import flags as azure_flags from perfkitbenchmarker.providers.azure import util FLAGS = flags.FLAGS SSH_PORT = 22 flags.DEFINE_boolean( 'azure_infiniband', False, 'Install Mellanox OpenFabrics drivers' ) _AZ_VERSION_LOG = flags.DEFINE_boolean( 'azure_version_log', True, 'Log az --version.' ) DEFAULT_REGION = 'eastus2' REGION = 'region' ZONE = 'zone' def GetResourceGroup(zone=None): """Get the resource group for the current benchmark.""" spec = context.GetThreadBenchmarkSpec() # This is protected by spec.networks_lock, so there's no race # condition with checking for the attribute and then creating a # resource group. try: return spec.azure_resource_group except AttributeError: group = AzureResourceGroup( 'pkb%s-%s' % (FLAGS.run_uri, spec.uid), zone=zone ) spec.azure_resource_group = group return group class AzureResourceGroup(resource.BaseResource): """A Resource Group, the basic unit of Azure provisioning.""" _az_lock = threading.Condition() _az_version: str | None = None def __init__( self, name, zone=None, use_existing=False, timeout_minutes=None, raise_on_create_failure=True, ): super().__init__() AzureResourceGroup._log_az_version() self.name = name self.use_existing = use_existing self.timeout_minutes = timeout_minutes self.raise_on_create_failure = raise_on_create_failure # A resource group's region doesn't affect the region of # actual resources, but we need to choose *some* region for every # benchmark, even if the user doesn't specify one. self.region = util.GetRegionFromZone( FLAGS.zone[0] if FLAGS.zone else zone or DEFAULT_REGION ) # Whenever an Azure CLI command needs a resource group, it's # always specified the same way. self.args = ['--resource-group', self.name] def _Create(self): if not self.use_existing: # A resource group can own resources in multiple zones, but the # group itself needs to have a region. Therefore, # FLAGS.zone[0]. _, stderr, retcode = vm_util.IssueCommand( [ azure.AZURE_PATH, 'group', 'create', '--name', self.name, '--location', self.region, '--tags', ] + util.GetTags(self.timeout_minutes), raise_on_failure=False, ) if retcode and self.raise_on_create_failure: raise errors.Resource.RetryableCreationError( f'Error creating Azure resource group:\n{stderr}' ) def _Exists(self): stdout, _, _ = vm_util.IssueCommand( [azure.AZURE_PATH, 'group', 'show', '--name', self.name], raise_on_failure=False, ) try: json.loads(stdout) return True except ValueError: return False def _Delete(self): # Ignore delete failures (potentially already deleted) and timeouts as # delete should complete even if we stop waiting for the response. vm_util.IssueCommand( [azure.AZURE_PATH, 'group', 'delete', '--yes', '--name', self.name], timeout=600, raise_on_failure=False, raise_on_timeout=False, ) def AddTag(self, key, value): """Add a single tag to an existing Resource Group. Args: key: tag key value: tag value Raises: errors.resource.CreationError on failure. """ tag_cmd = [ azure.AZURE_PATH, 'group', 'update', '--name', self.name, '--set', 'tags.' + util.FormatTag(key, value), ] _, _, retcode = vm_util.IssueCommand(tag_cmd, raise_on_failure=False) if retcode: raise errors.Resource.CreationError('Error tagging Azure resource group.') @classmethod def _log_az_version(cls): if not _AZ_VERSION_LOG.value: return with cls._az_lock: if not cls._az_version: cls.az_version, _, _ = vm_util.IssueCommand( [azure.AZURE_PATH, '--version'], raise_on_failure=True ) class AzureStorageAccount(resource.BaseResource): """Object representing an Azure Storage Account.""" total_storage_accounts = 0 def __init__( self, name: str, region: str, # TODO(pclay): Update default kind to StorageV2 kind: str = 'Storage', storage_type: str | None = None, access_tier: str | None = None, resource_group: AzureResourceGroup | None = None, use_existing: bool = False, raise_on_create_failure: bool = True, ): super().__init__() self.name = name self.resource_group = resource_group or GetResourceGroup() self.region = region self.storage_type = storage_type self.kind = kind self.access_tier = access_tier self.use_existing = use_existing self.raise_on_create_failure = raise_on_create_failure AzureStorageAccount.total_storage_accounts += 1 def _Create(self): """Creates the storage account.""" if self.use_existing: return create_cmd = ( [ azure.AZURE_PATH, 'storage', 'account', 'create', '--kind', self.kind, '--name', self.name, '--tags', ] + util.GetTags(self.resource_group.timeout_minutes) + self.resource_group.args ) if self.region: create_cmd.extend(['--location', self.region]) if self.storage_type: create_cmd.extend(['--sku', self.storage_type]) if self.access_tier: create_cmd.extend(['--access-tier', self.access_tier]) vm_util.IssueCommand( create_cmd, raise_on_failure=self.raise_on_create_failure ) def _PostCreate(self): """Get our connection string and our keys.""" # https://learn.microsoft.com/en-us/azure/templates/microsoft.storage/storageaccounts?pivots=deployment-language-bicep#resource-format response = self._Get() kind = response['kind'] if kind != self.kind: raise ValueError( f'Storage account {self.name} has kind {kind}, expected {self.kind}' ) access_tier = response['accessTier'] if access_tier: if not self.access_tier: self.access_tier = access_tier elif self.access_tier != access_tier: raise ValueError( f'Storage account {self.name} has access tier' f' {access_tier}, expected {self.access_tier}' ) sku = response['sku']['name'] if not self.storage_type: self.storage_type = sku elif sku != self.storage_type: raise ValueError( f'Storage account {self.name} has SKU {sku}, expected' f' {self.storage_type}' ) # These fields are sensitive and require their own RPCs self.connection_string = util.GetAzureStorageConnectionString( self.name, resource_group=self.resource_group.name ) self.connection_args = ['--connection-string', self.connection_string] self.key = util.GetAzureStorageAccountKey( self.name, self.resource_group.args ) def _Delete(self): """Deletes the storage account.""" if self.use_existing: return delete_cmd = [ azure.AZURE_PATH, 'storage', 'account', 'delete', '--name', self.name, '--yes', ] + self.resource_group.args vm_util.IssueCommand(delete_cmd, raise_on_failure=False) def _Get(self): """Gets the storage account.""" stdout, stderr, retcode = vm_util.IssueCommand( [ azure.AZURE_PATH, 'storage', 'account', 'show', '--output', 'json', '--name', self.name, ] + self.resource_group.args, raise_on_failure=False, ) if retcode or not stdout: raise errors.Resource.GetError( f'Error getting Azure storage account:\n{stderr}' ) return json.loads(stdout) def _Exists(self): """Returns true if the storage account exists.""" try: self._Get() return True except errors.Resource.GetError: return False class AzureVirtualNetwork(network.BaseNetwork): """Object representing an Azure Virtual Network. The benchmark spec contains one instance of this class per region, which an AzureNetwork may retrieve or create via AzureVirtualNetwork.GetForRegion. Attributes: name: string. Name of the virtual network. resource_group: Resource Group instance that network belongs to. region: string. Azure region of the network. """ # Initializes an address space for a new AzureVirtualNetwork _regional_network_count = 0 vnet_lock = threading.Lock() CLOUD = provider_info.AZURE def __init__(self, spec, region, name): super().__init__(spec) self.name = name self.resource_group = GetResourceGroup() self.region = region self.args = ['--vnet-name', self.name] self.address_index = 0 self.regional_index = AzureVirtualNetwork._regional_network_count self.address_spaces = [] self.is_created = False @classmethod def GetForRegion(cls, spec, region, name) -> 'AzureVirtualNetwork | None': """Retrieves or creates an AzureVirtualNetwork. Args: spec: BaseNetworkSpec. Spec for Azure Network. region: string. Azure region name. name: string. Azure Network Name. Returns: AzureVirtualNetwork | None. If AZURE_SUBNET_ID is specified, an existing network is used via a subnet, and PKB does not add it to the benchmark spec so this method returns None. If an AzureVirtualNetwork for the same region already exists in the benchmark spec, that instance is returned. Otherwise, a new AzureVirtualNetwork is created and returned. """ benchmark_spec = context.GetThreadBenchmarkSpec() if benchmark_spec is None: raise errors.Error( 'GetNetwork called in a thread without a BenchmarkSpec.' ) if azure_flags.AZURE_SUBNET_ID.value: # AzureVirtualNetworks are not Resources, so we just return None here as # the network exists but will be accessed via the subnet. return None key = cls.CLOUD, REGION, region # Because this method is only called from the AzureNetwork constructor, # which is only called from AzureNetwork.GetNetwork, we already hold the # benchmark_spec.networks_lock. if key not in benchmark_spec.regional_networks: benchmark_spec.regional_networks[key] = cls(spec, region, name) AzureVirtualNetwork._regional_network_count += 1 return benchmark_spec.regional_networks[key] def GetNextAddressSpace(self) -> str: """Returns the next available address space for next subnet.""" with self.vnet_lock: next_address_space = network.GetCidrBlock( self.regional_index, self.address_index ) self.address_index += 1 return next_address_space def Create(self): """Creates the virtual network.""" with self.vnet_lock: if self.is_created: return logging.info( 'Creating %d Azure subnets in %s', len(self.address_spaces), self.region, ) vm_util.IssueRetryableCommand( [ azure.AZURE_PATH, 'network', 'vnet', 'create', '--location', self.region, '--name', self.name, '--address-prefixes', ] + self.address_spaces + self.resource_group.args ) self.is_created = True def Delete(self): """Deletes the virtual network.""" pass @vm_util.Retry() def Exists(self): """Returns true if the virtual network exists.""" stdout, _, _ = vm_util.IssueCommand( [ azure.AZURE_PATH, 'network', 'vnet', 'show', '--output', 'json', '--name', self.name, ] + self.resource_group.args, raise_on_failure=False, ) return bool(json.loads(stdout)) class AzureSubnet(resource.BaseResource): """Object representing an Azure Subnet.""" def __init__(self, vnet, name): super().__init__() if azure_flags.AZURE_SUBNET_ID.value: # use pre-existing subnet self.id = azure_flags.AZURE_SUBNET_ID.value self.user_managed = True else: # create a new subnet. # id could be set after _Exists succeeds, but is only used by AzureNIC() # before AzureSubnet.Create() is called. self.id = None self.user_managed = False self.resource_group = GetResourceGroup() self.vnet = vnet self.name = name self.args = ['--subnet', self.name] self.address_space = '' if self.vnet is not None: self.address_space = self.vnet.GetNextAddressSpace() # Append to vnet address_spaces so that it can be used in vnet _Create() self.vnet.address_spaces.append(self.address_space) def _Create(self): vm_util.IssueCommand( [ azure.AZURE_PATH, 'network', 'vnet', 'subnet', 'create', '--vnet-name', self.vnet.name, '--address-prefix', self.address_space, '--name', self.name, ] + self.resource_group.args ) @vm_util.Retry() def _Exists(self): stdout, _, _ = vm_util.IssueCommand( [ azure.AZURE_PATH, 'network', 'vnet', 'subnet', 'show', '--vnet-name', self.vnet.name, '--output', 'json', '--name', self.name, ] + self.resource_group.args, raise_on_failure=False, ) return bool(json.loads(stdout)) def _Delete(self): pass class AzureNetworkSecurityGroup(resource.BaseResource): """Object representing an Azure Network Security Group.""" def __init__(self, region, subnet, name): super().__init__() self.region = region self.subnet = subnet self.name = name self.resource_group = GetResourceGroup() self.args = ['--nsg', self.name] self.rules_lock = threading.Lock() # Mapping of (start_port, end_port, source) -> rule name, used to # deduplicate rules. We expect duplicate rules because PKB will # call AllowPort() for each VM on a subnet, but the rules are # actually applied to the entire subnet. self.rules = {} # True if the special 'DenyAll' rule is present. self.have_deny_all_rule = False def _Create(self): vm_util.IssueRetryableCommand( [ azure.AZURE_PATH, 'network', 'nsg', 'create', '--location', self.region, '--name', self.name, ] + self.resource_group.args ) @vm_util.Retry() def _Exists(self): stdout, _, _ = vm_util.IssueCommand( [ azure.AZURE_PATH, 'network', 'nsg', 'show', '--output', 'json', '--name', self.name, ] + self.resource_group.args, raise_on_failure=False, ) return bool(json.loads(stdout)) def _Delete(self): pass def _GetRulePriority(self, rule, rule_name): # Azure priorities are between 100 and 4096, but we reserve 4095 # for the special DenyAll rule created by DisallowAllPorts. rule_priority = 100 + len(self.rules) if rule_priority >= 4095: raise ValueError('Too many firewall rules!') self.rules[rule] = rule_name return rule_priority def AttachToSubnet(self): vm_util.IssueRetryableCommand( [ azure.AZURE_PATH, 'network', 'vnet', 'subnet', 'update', '--name', self.subnet.name, '--network-security-group', self.name, ] + self.resource_group.args + self.subnet.vnet.args ) def AllowPort( self, vm, start_port, end_port=None, source_range=None, protocol='*' ): """Open a port or port range. Args: vm: the virtual machine to open the port for. start_port: either a single port or the start of a range. end_port: if given, the end of the port range. source_range: List of source CIDRs to allow for this port. If None, all sources are allowed. i.e. ['0.0.0.0/0'] protocol: Network protocol this rule applies to. One of {*, Ah, Esp, Icmp, Tcp, Udp}. See https://learn.microsoft.com/en-us/cli/azure/network/nsg/rule?view=azure-cli-latest#az-network-nsg-rule-create Raises: ValueError: when there are too many firewall rules. """ source_range = source_range or ['0.0.0.0/0'] end_port = end_port or start_port source_range.sort() # Replace slashes as they are not allowed in an azure rule name. source_range_str = ','.join(source_range).replace('/', '_') rule = (start_port, end_port, source_range_str) with self.rules_lock: if rule in self.rules: return port_range = '%s-%s' % (start_port, end_port) rule_name = 'allow-%s-%s' % (port_range, source_range_str) rule_priority = self._GetRulePriority(rule, rule_name) network_cmd = ( [ azure.AZURE_PATH, 'network', 'nsg', 'rule', 'create', '--name', rule_name, '--destination-port-range', port_range, '--protocol', protocol, '--access', 'Allow', '--priority', str(rule_priority), ] + ['--source-address-prefixes'] + source_range ) network_cmd.extend(self.resource_group.args + self.args) vm_util.IssueRetryableCommand(network_cmd) def AllowIcmp(self): source_address = '0.0.0.0/0' # '*' in Azure represents all ports rule = ('*', source_address) rule_name = 'allow-icmp' with self.rules_lock: if rule in self.rules: return rule_priority = self._GetRulePriority(rule, rule_name) network_cmd = [ azure.AZURE_PATH, 'network', 'nsg', 'rule', 'create', '--name', rule_name, '--access', 'Allow', '--source-address-prefixes', source_address, '--source-port-ranges', '*', '--destination-port-ranges', '*', '--priority', str(rule_priority), '--protocol', 'Icmp', ] network_cmd.extend(self.resource_group.args + self.args) vm_util.IssueRetryableCommand(network_cmd) class AzureFirewall(network.BaseFirewall): """A firewall on Azure is a Network Security Group. NSGs are per-subnet, but this class is per-provider, so we just proxy methods through to the right NSG instance. """ CLOUD = provider_info.AZURE def AllowPort(self, vm, start_port, end_port=None, source_range=None): """Opens a port on the firewall. This is a no-op if there is no firewall configured for the VM. This can happen when using an already existing subnet. Args: vm: The BaseVirtualMachine object to open the port for. start_port: The local port to open. end_port: if given, open the range [start_port, end_port]. source_range: unsupported at present. """ if vm.network.nsg: vm.network.nsg.AllowPort( vm, start_port, end_port=end_port, source_range=source_range ) def DisallowAllPorts(self): """Closes all ports on the firewall.""" pass def AllowIcmp(self, vm): """Opens the ICMP protocol on the firewall. Args: vm: The BaseVirtualMachine object to open the ICMP protocol for. """ vm.network.nsg.AllowIcmp() class AzureNetwork(network.BaseNetwork): """Locational network components. A container object holding all of the network-related objects that we need for an Azure zone (aka region). """ CLOUD = provider_info.AZURE def __init__(self, spec): super().__init__(spec) self.resource_group = GetResourceGroup() self.region = util.GetRegionFromZone(self.zone) self.availability_zone = util.GetAvailabilityZoneFromZone(self.zone) is_dedicated_host = bool(FLAGS.dedicated_hosts) in_availability_zone = bool(self.availability_zone) # Placement Group no_placement_group = ( not FLAGS.placement_group_style or FLAGS.placement_group_style == placement_group.PLACEMENT_GROUP_NONE ) has_optional_pg = ( FLAGS.placement_group_style == placement_group.PLACEMENT_GROUP_CLOSEST_SUPPORTED ) if no_placement_group: self.placement_group = None elif has_optional_pg and len(set(FLAGS.zone)) > 1: logging.warning( 'inter-zone/inter-region tests do not support placement groups. ' 'Placement group style set to none.' ) self.placement_group = None elif len(set(FLAGS.zone)) > 1: raise errors.Benchmarks.UnsupportedConfigError( 'inter-zone/inter-region tests do not support placement groups. ' 'Use placement group style closest_supported.' ) # With dedicated hosting and/or an availability zone, an availability set # cannot be created elif ( FLAGS.placement_group_style == azure_placement_group.AVAILABILITY_SET and (is_dedicated_host or in_availability_zone) ): self.placement_group = None else: placement_group_spec = azure_placement_group.AzurePlacementGroupSpec( 'AzurePlacementGroupSpec', flag_values=FLAGS, zone=self.zone, resource_group=self.resource_group.name, ) self.placement_group = azure_placement_group.AzurePlacementGroup( placement_group_spec ) # Storage account names can't include separator characters :(. storage_account_prefix = 'pkb%s' % FLAGS.run_uri # Storage account names must be 3-24 characters long and use # numbers and lower-case letters only, which leads us to this # awful naming scheme. suffix = 'storage%d' % AzureStorageAccount.total_storage_accounts self.storage_account = AzureStorageAccount( name=storage_account_prefix[: 24 - len(suffix)] + suffix, region=self.region, storage_type=FLAGS.azure_storage_type, ) # Length restriction from https://docs.microsoft.com/en-us/azure/azure-resource-manager/management/resource-name-rules#microsoftnetwork pylint: disable=line-too-long prefix = '%s-%s' % (self.resource_group.name, self.region) vnet_name = prefix + '-vnet' if len(vnet_name) > 64: vnet_name = prefix[:59] + '-vnet' self.vnet = AzureVirtualNetwork.GetForRegion(spec, self.region, vnet_name) subnet_name = vnet_name if self.availability_zone: subnet_name += '-' + self.availability_zone subnet_name += '-subnet' self.subnet = AzureSubnet(self.vnet, subnet_name) if azure_flags.AZURE_SUBNET_ID.value: # usage of an nsg is not currently supported with an existing subnet. self.nsg = None else: self.nsg = AzureNetworkSecurityGroup( self.region, self.subnet, self.subnet.name + '-nsg' ) @vm_util.Retry( retryable_exceptions=( errors.Resource.RetryableCreationError, # Many nested commands are not classified as retryable. # Most should have their own retry logica and raise a different error. errors.VmUtil.IssueCommandError, ) ) def Create(self): """Creates the network.""" # If the benchmark includes multiple zones, # self.resource_group.Create() will be called more than once. But # BaseResource will prevent us from running the underlying Azure # commands more than once, so that is fine. self.resource_group.Create() if self.placement_group: self.placement_group.Create() self.storage_account.Create() if self.vnet: self.vnet.Create() if self.subnet: self.subnet.Create() if self.nsg: self.nsg.Create() self.nsg.AttachToSubnet() def Delete(self): """Deletes the network.""" # If the benchmark includes multiple zones, this will be called # multiple times, but there will be no bad effects from multiple # deletes. self.resource_group.Delete() def Peer(self, peering_network): """Peers the network with the peering_network. This method is used for VPC peering. It will connect 2 VPCs together. Args: peering_network: BaseNetwork. The network to peer with. """ # Skip Peering if the networks are the same if self.vnet is peering_network.vnet: return spec = network.BaseVPCPeeringSpec(self.vnet, peering_network.vnet) self.vpc_peering = AzureVpcPeering(spec) peering_network.vpc_peering = self.vpc_peering self.vpc_peering.Create() @classmethod def _GetKeyFromNetworkSpec(cls, spec): """Returns a key used to register Network instances.""" return (cls.CLOUD, ZONE, spec.zone) class AzureVpcPeering(network.BaseVPCPeering): """Object containing all information needed to create a VPC Peering Object.""" def _Create(self): """Creates the peering object.""" self.name = '%s-%s-%s' % ( self.network_a.resource_group.name, self.network_a.region, self.network_b.region, ) # Creates Peering Connection create_cmd = [ azure.AZURE_PATH, 'network', 'vnet', 'peering', 'create', '--name', self.name, '--vnet-name', self.network_a.name, '--remote-vnet', self.network_b.name, '--allow-vnet-access', ] + self.network_a.resource_group.args vm_util.IssueRetryableCommand(create_cmd) # Accepts Peering Connection accept_cmd = [ azure.AZURE_PATH, 'network', 'vnet', 'peering', 'create', '--name', self.name, '--vnet-name', self.network_b.name, '--remote-vnet', self.network_a.name, '--allow-vnet-access', ] + self.network_b.resource_group.args vm_util.IssueRetryableCommand(accept_cmd) logging.info( 'Created VPC peering between %s and %s', self.network_a.address_spaces[0], self.network_b.address_spaces[0], ) def _Delete(self): """Deletes the peering connection.""" # Gets Deleted with resource group deletion pass