perfkitbenchmarker/providers/gcp/gce_virtual_machine.py (1,375 lines of code) (raw):
# Copyright 2017 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Class to represent a GCE Virtual Machine object.
Zones:
run 'gcloud compute zones list'
Machine Types:
run 'gcloud compute machine-types list'
Images:
run 'gcloud compute images list'
All VM specifics are self-contained and the class provides methods to
operate on the VM: boot, shutdown, etc.
"""
import collections
import copy
import datetime
import itertools
import json
import logging
import posixpath
import re
import threading
import time
from typing import Dict, List, Tuple
from absl import flags
from perfkitbenchmarker import boot_disk
from perfkitbenchmarker import custom_virtual_machine_spec
from perfkitbenchmarker import errors
from perfkitbenchmarker import flag_util
from perfkitbenchmarker import flags as pkb_flags
from perfkitbenchmarker import linux_virtual_machine as linux_vm
from perfkitbenchmarker import os_types
from perfkitbenchmarker import placement_group
from perfkitbenchmarker import provider_info
from perfkitbenchmarker import resource
from perfkitbenchmarker import virtual_machine
from perfkitbenchmarker import vm_util
from perfkitbenchmarker.configs import option_decoders
from perfkitbenchmarker.providers.gcp import flags as gcp_flags
from perfkitbenchmarker.providers.gcp import gce_disk
from perfkitbenchmarker.providers.gcp import gce_disk_strategies
from perfkitbenchmarker.providers.gcp import gce_network
from perfkitbenchmarker.providers.gcp import gcs
from perfkitbenchmarker.providers.gcp import util
import yaml
FLAGS = flags.FLAGS
# GCE Instance life cycle:
# https://cloud.google.com/compute/docs/instances/instance-life-cycle
RUNNING = 'RUNNING'
TERMINATED = 'TERMINATED'
INSTANCE_DELETED_STATUSES = frozenset([TERMINATED])
INSTANCE_TRANSITIONAL_STATUSES = frozenset([
'PROVISIONING',
'STAGING',
'STOPPING',
'REPAIRING',
'SUSPENDING',
])
INSTANCE_EXISTS_STATUSES = INSTANCE_TRANSITIONAL_STATUSES | frozenset(
[RUNNING, 'SUSPENDED', 'STOPPED']
)
INSTANCE_KNOWN_STATUSES = INSTANCE_EXISTS_STATUSES | INSTANCE_DELETED_STATUSES
# Gcloud operations are complete when their 'status' is 'DONE'.
OPERATION_DONE = 'DONE'
# 2h timeout for LM notification
LM_NOTIFICATION_TIMEOUT_SECONDS = 60 * 60 * 2
# 10m wait time prior to checking log for LM status
LM_UNAVAILABLE_STATUS_WAIT_TIME_MIN = 10
NVME = 'NVME'
SCSI = 'SCSI'
_INSUFFICIENT_HOST_CAPACITY = (
'does not have enough resources available to fulfill the request.'
)
_FAILED_TO_START_DUE_TO_PREEMPTION = (
'Instance failed to start due to preemption.'
)
_GCE_VM_CREATE_TIMEOUT = 1200
_GCE_NVIDIA_GPU_PREFIX = 'nvidia-'
_GCE_NVIDIA_TESLA_GPU_PREFIX = 'nvidia-tesla-'
_SHUTDOWN_SCRIPT = 'su "{user}" -c "echo | gsutil cp - {preempt_marker}"'
METADATA_PREEMPT_URI = (
'http://metadata.google.internal/computeMetadata/v1/instance/preempted'
)
_METADATA_PREEMPT_CMD = (
f'curl {METADATA_PREEMPT_URI} -H "Metadata-Flavor: Google"'
)
# Machine type to ARM architecture.
_MACHINE_TYPE_PREFIX_TO_ARM_ARCH = {
't2a': 'neoverse-n1',
'c3a': 'ampere1',
'c4a': 'neoverse-v2',
}
# The A2 and A3 machine families, unlike some other GCP offerings, have a
# preset type and number of GPUs, so we set those attributes directly from the
# machine_type.
_FIXED_GPU_MACHINE_TYPES = {
# A100 GPUs
# https://cloud.google.com/blog/products/compute/announcing-google-cloud-a2-vm-family-based-on-nvidia-a100-gpu
'a2-highgpu-1g': (virtual_machine.GPU_A100, 1),
'a2-highgpu-2g': (virtual_machine.GPU_A100, 2),
'a2-highgpu-4g': (virtual_machine.GPU_A100, 4),
'a2-highgpu-8g': (virtual_machine.GPU_A100, 8),
'a2-megagpu-16g': (virtual_machine.GPU_A100, 16),
'a2-ultragpu-1g': (virtual_machine.GPU_A100, 1),
'a2-ultragpu-2g': (virtual_machine.GPU_A100, 2),
'a2-ultragpu-4g': (virtual_machine.GPU_A100, 4),
'a2-ultragpu-8g': (virtual_machine.GPU_A100, 8),
# H100 GPUs
'a3-highgpu-1g': (virtual_machine.GPU_H100, 1),
'a3-highgpu-2g': (virtual_machine.GPU_H100, 2),
'a3-highgpu-4g': (virtual_machine.GPU_H100, 4),
'a3-highgpu-8g': (virtual_machine.GPU_H100, 8),
'a3-megagpu-8g': (virtual_machine.GPU_H100, 8),
# L4 GPUs
# https://cloud.google.com/compute/docs/accelerator-optimized-machines#g2-vms
'g2-standard-4': (virtual_machine.GPU_L4, 1),
'g2-standard-8': (virtual_machine.GPU_L4, 1),
'g2-standard-12': (virtual_machine.GPU_L4, 1),
'g2-standard-16': (virtual_machine.GPU_L4, 1),
'g2-standard-24': (virtual_machine.GPU_L4, 2),
'g2-standard-32': (virtual_machine.GPU_L4, 1),
'g2-standard-48': (virtual_machine.GPU_L4, 4),
'g2-standard-96': (virtual_machine.GPU_L4, 8),
}
PKB_SKIPPED_TEARDOWN_METADATA_KEY = 'pkb_skipped_teardown'
class GceRetryDescribeOperationsError(Exception):
"""Exception for retrying Exists().
When there is an internal error with 'describe operations' or the
'instances create' operation has not yet completed.
"""
class GceServiceUnavailableError(Exception):
"""Error for retrying _Exists when the describe output indicates that 'The service is currently unavailable'."""
class GceVmSpec(virtual_machine.BaseVmSpec):
"""Object containing the information needed to create a GceVirtualMachine.
Attributes:
cpus: None or int. Number of vCPUs for custom VMs.
memory: None or string. For custom VMs, a string representation of the size
of memory, expressed in MiB or GiB. Must be an integer number of MiB (e.g.
"1280MiB", "7.5GiB").
num_local_ssds: int. The number of local SSDs to attach to the instance.
preemptible: boolean. True if the VM should be preemptible, False otherwise.
project: string or None. The project to create the VM in.
image_family: string or None. The image family used to locate the image.
image_project: string or None. The image project used to locate the
specified image.
boot_disk_size: None or int. The size of the boot disk in GB.
boot_disk_type: string or None. The type of the boot disk.
boot_disk_iops: None or int. I/O operations per second
boot_disk_throughtput: None or int. Number of throughput mb per second
"""
CLOUD = provider_info.GCP
def __init__(self, *args, **kwargs):
self.num_local_ssds: int = None
self.preemptible: bool = None
self.boot_disk_size: int = None
self.boot_disk_type: str = None
self.boot_disk_iops: int = None
self.boot_disk_throughput: int = None
self.project: str = None
self.image_family: str = None
self.image_project: str = None
self.node_type: str = None
self.min_cpu_platform: str = None
self.threads_per_core: int = None
self.visible_core_count: int = None
self.gce_tags: List[str] = None
self.min_node_cpus: int = None
self.subnet_names: List[str] = None
super().__init__(*args, **kwargs)
self.boot_disk_spec = boot_disk.BootDiskSpec(
self.boot_disk_size,
self.boot_disk_type,
self.boot_disk_iops,
self.boot_disk_throughput,
)
if isinstance(
self.machine_type, custom_virtual_machine_spec.CustomMachineTypeSpec
):
logging.warning(
'Specifying a custom machine in the format of '
'{cpus: [NUMBER_OF_CPUS], memory: [GB_OF_MEMORY]} '
'creates a custom machine in the n1 machine family. '
'To create custom machines in other machine families, '
'use [MACHINE_FAMILY]-custom-[NUMBER_CPUS]-[NUMBER_MiB] '
'nomaclature. e.g. n2-custom-2-4096.'
)
self.cpus = self.machine_type.cpus
self.memory = self.machine_type.memory
self.machine_type = None
else:
self.cpus = None
self.memory = None
@classmethod
def _ApplyFlags(cls, config_values, flag_values):
"""Modifies config options based on runtime flag values.
Can be overridden by derived classes to add support for specific flags.
Args:
config_values: dict mapping config option names to provided values. May be
modified by this function.
flag_values: flags.FlagValues. Runtime flags that may override the
provided config values.
"""
super()._ApplyFlags(config_values, flag_values)
if flag_values['gce_num_local_ssds'].present:
config_values['num_local_ssds'] = flag_values.gce_num_local_ssds
if flag_values['gce_ssd_interface'].present:
config_values['ssd_interface'] = flag_values.gce_ssd_interface
if flag_values['gce_preemptible_vms'].present:
config_values['preemptible'] = flag_values.gce_preemptible_vms
if flag_values['gce_boot_disk_size'].present:
config_values['boot_disk_size'] = flag_values.gce_boot_disk_size
if flag_values['gce_boot_disk_throughput'].present:
config_values['boot_disk_throughput'] = (
flag_values.gce_boot_disk_throughput
)
if flag_values['gce_boot_disk_iops'].present:
config_values['boot_disk_iops'] = flag_values.gce_boot_disk_iops
if flag_values['gce_boot_disk_type'].present:
config_values['boot_disk_type'] = flag_values.gce_boot_disk_type
if flag_values['machine_type'].present:
config_values['machine_type'] = yaml.safe_load(flag_values.machine_type)
if flag_values['project'].present:
config_values['project'] = flag_values.project
if flag_values['image_family'].present:
config_values['image_family'] = flag_values.image_family
if flag_values['image_project'].present:
config_values['image_project'] = flag_values.image_project
if flag_values['gcp_node_type'].present:
config_values['node_type'] = flag_values.gcp_node_type
if flag_values['gcp_min_cpu_platform'].present:
if (
flag_values.gcp_min_cpu_platform
!= gcp_flags.GCP_MIN_CPU_PLATFORM_NONE
):
config_values['min_cpu_platform'] = flag_values.gcp_min_cpu_platform
else:
# Specifying gcp_min_cpu_platform explicitly removes any config.
config_values.pop('min_cpu_platform', None)
if flag_values['disable_smt'].present and flag_values.disable_smt:
config_values['threads_per_core'] = 1
if flag_values['visible_core_count'].present:
config_values['visible_core_count'] = flag_values.visible_core_count
# Convert YAML to correct type even if only one element.
if 'gce_tags' in config_values and isinstance(
config_values['gce_tags'], str
):
config_values['gce_tags'] = [config_values['gce_tags']]
if flag_values['gce_tags'].present:
config_values['gce_tags'] = flag_values.gce_tags
if flag_values['gce_subnet_name'].present:
config_values['subnet_names'] = flag_values.gce_subnet_name
@classmethod
def _GetOptionDecoderConstructions(cls):
"""Gets decoder classes and constructor args for each configurable option.
Returns:
dict. Maps option name string to a (ConfigOptionDecoder class, dict) pair.
The pair specifies a decoder class and its __init__() keyword
arguments to construct in order to decode the named option.
"""
result = super()._GetOptionDecoderConstructions()
result.update({
'machine_type': (
custom_virtual_machine_spec.MachineTypeDecoder,
{'default': None},
),
'ssd_interface': (
option_decoders.StringDecoder,
{'default': 'NVME'},
),
'num_local_ssds': (
option_decoders.IntDecoder,
{'default': 0, 'min': 0},
),
'preemptible': (option_decoders.BooleanDecoder, {'default': False}),
'boot_disk_size': (option_decoders.IntDecoder, {'default': None}),
'boot_disk_type': (
option_decoders.StringDecoder,
{'default': None},
),
'boot_disk_iops': (option_decoders.IntDecoder, {'default': None}),
'boot_disk_throughput': (
option_decoders.IntDecoder,
{'default': None},
),
'project': (option_decoders.StringDecoder, {'default': None}),
'image_family': (option_decoders.StringDecoder, {'default': None}),
'image_project': (option_decoders.StringDecoder, {'default': None}),
'node_type': (
option_decoders.StringDecoder,
{'default': 'n1-node-96-624'},
),
'min_cpu_platform': (
option_decoders.StringDecoder,
{'default': None},
),
'threads_per_core': (option_decoders.IntDecoder, {'default': None}),
'visible_core_count': (option_decoders.IntDecoder, {'default': None}),
'gce_tags': (
option_decoders.ListDecoder,
{
'item_decoder': option_decoders.StringDecoder(),
'default': None,
},
),
'subnet_names': (
option_decoders.ListDecoder,
{
'item_decoder': option_decoders.StringDecoder(),
'default': None,
},
),
})
return result
class GceSoleTenantNodeTemplate(resource.BaseResource):
"""Object representing a GCE sole tenant node template.
Attributes:
name: string. The name of the node template.
node_type: string. The node type of the node template.
zone: string. The zone of the node template, converted to region.
"""
def __init__(self, name, node_type, zone, project):
super().__init__()
self.name = name
self.node_type = node_type
self.region = util.GetRegionFromZone(zone)
self.project = project
def _Create(self):
"""Creates the node template."""
cmd = util.GcloudCommand(
self, 'compute', 'sole-tenancy', 'node-templates', 'create', self.name
)
cmd.flags['node-type'] = self.node_type
cmd.flags['region'] = self.region
_, stderr, retcode = cmd.Issue(raise_on_failure=False)
util.CheckGcloudResponseKnownFailures(stderr, retcode)
def _Exists(self):
"""Returns True if the node template exists."""
cmd = util.GcloudCommand(
self, 'compute', 'sole-tenancy', 'node-templates', 'describe', self.name
)
cmd.flags['region'] = self.region
_, _, retcode = cmd.Issue(raise_on_failure=False)
return not retcode
def _Delete(self):
"""Deletes the node template."""
cmd = util.GcloudCommand(
self, 'compute', 'sole-tenancy', 'node-templates', 'delete', self.name
)
cmd.flags['region'] = self.region
cmd.Issue(raise_on_failure=False)
class GceSoleTenantNodeGroup(resource.BaseResource):
"""Object representing a GCE sole tenant node group.
Attributes:
name: string. The name of the node group.
node_template: string. The note template of the node group.
zone: string. The zone of the node group.
"""
_counter_lock = threading.Lock()
_counter = itertools.count()
def __init__(self, node_type, zone, project):
super().__init__()
with self._counter_lock:
self.instance_number = next(self._counter)
self.name = 'pkb-node-group-%s-%s' % (FLAGS.run_uri, self.instance_number)
self.node_type = node_type
self.node_template = None
self.zone = zone
self.project = project
self.fill_fraction = 0.0
def _Create(self):
"""Creates the host."""
cmd = util.GcloudCommand(
self, 'compute', 'sole-tenancy', 'node-groups', 'create', self.name
)
assert self.node_template is not None
cmd.flags['node-template'] = self.node_template.name
cmd.flags['target-size'] = 1
_, stderr, retcode = cmd.Issue(raise_on_failure=False)
util.CheckGcloudResponseKnownFailures(stderr, retcode)
def _CreateDependencies(self):
super()._CreateDependencies()
node_template_name = self.name.replace('group', 'template')
node_template = GceSoleTenantNodeTemplate(
node_template_name, self.node_type, self.zone, self.project
)
node_template.Create()
self.node_template = node_template
def _DeleteDependencies(self):
if self.node_template:
self.node_template.Delete()
def _Exists(self):
"""Returns True if the host exists."""
cmd = util.GcloudCommand(
self, 'compute', 'sole-tenancy', 'node-groups', 'describe', self.name
)
_, _, retcode = cmd.Issue(raise_on_failure=False)
return not retcode
def _IsReady(self) -> bool:
"""Returns True if the node-group is ready."""
cmd = util.GcloudCommand(
self, 'compute', 'sole-tenancy', 'node-groups', 'describe', self.name
)
stdout, _, _ = cmd.Issue(raise_on_failure=False)
return json.loads(stdout).get('status') == 'READY'
def _Delete(self):
"""Deletes the host."""
cmd = util.GcloudCommand(
self, 'compute', 'sole-tenancy', 'node-groups', 'delete', self.name
)
cmd.Issue(raise_on_failure=False)
def GenerateAcceleratorSpecString(accelerator_type, accelerator_count):
"""Generates a string to be used to attach accelerators to a VM using gcloud.
This function takes a cloud-agnostic accelerator type (k80, p100, etc.) and
returns a gce-specific accelerator name (nvidia-tesla-k80, etc).
If FLAGS.gce_accelerator_type_override is specified, the value of said flag
will be used as the name of the accelerator.
Args:
accelerator_type: cloud-agnostic accelerator type (p100, k80, etc.)
accelerator_count: number of accelerators to attach to the VM
Returns:
String to be used by gcloud to attach accelerators to a VM.
Must be prepended by the flag '--accelerator'.
"""
gce_accelerator_type = FLAGS.gce_accelerator_type_override or (
(
_GCE_NVIDIA_TESLA_GPU_PREFIX
if accelerator_type in virtual_machine.TESLA_GPU_TYPES
else _GCE_NVIDIA_GPU_PREFIX
)
+ accelerator_type
)
return 'type={},count={}'.format(gce_accelerator_type, accelerator_count)
def GetArmArchitecture(machine_type):
"""Returns the specific ARM processor architecture of the VM."""
# t2a-standard-1 -> t2a
if not machine_type:
return None
prefix = re.split(r'[dn]?\-', machine_type)[0]
return _MACHINE_TYPE_PREFIX_TO_ARM_ARCH.get(prefix)
class GceVirtualMachine(virtual_machine.BaseVirtualMachine):
"""Object representing a Google Compute Engine Virtual Machine."""
CLOUD = provider_info.GCP
DEFAULT_IMAGE = None
NVME_START_INDEX = 1
_host_lock = threading.Lock()
deleted_hosts = set()
host_map = collections.defaultdict(list)
_LM_TIMES_SEMAPHORE = threading.Semaphore(0)
_LM_NOTICE_SCRIPT = 'gce_maintenance_notice.py'
_LM_SIGNAL_LOG = 'lm_signal.log'
_LM_NOTICE_LOG = 'gce_maintenance_notice.log'
def __init__(self, vm_spec):
"""Initialize a GCE virtual machine.
Args:
vm_spec: virtual_machine.BaseVmSpec object of the vm.
Raises:
errors.Config.MissingOption: If the spec does not include a "machine_type"
or both "cpus" and "memory".
errors.Config.InvalidValue: If the spec contains both "machine_type" and
at least one of "cpus" or "memory".
"""
super().__init__(vm_spec)
self.create_cmd: util.GcloudCommand = None
self.boot_metadata = {}
self.boot_metadata_from_file = {}
if self.boot_startup_script:
self.boot_metadata_from_file['startup-script'] = self.boot_startup_script
self.ssd_interface = vm_spec.ssd_interface
if (
self.machine_type
and self.machine_type in gce_disk.FIXED_SSD_MACHINE_TYPES
):
self.ssd_interface = 'NVME'
self.cpus = vm_spec.cpus
self.image = self.image or self.DEFAULT_IMAGE
self.memory_mib = vm_spec.memory
self.preemptible = vm_spec.preemptible
self.spot_early_termination = False
self.preemptible_status_code = None
self.project = vm_spec.project or util.GetDefaultProject()
self.image_project = vm_spec.image_project or self.GetDefaultImageProject()
self.mtu: int | None = FLAGS.mtu
self.subnet_names = vm_spec.subnet_names
self.network = self._GetNetwork()
self.firewall = gce_network.GceFirewall.GetFirewall()
self.boot_disk = gce_disk.GceBootDisk(self, vm_spec.boot_disk_spec)
self.disks = [self.boot_disk]
self.id = None
self.node_type = vm_spec.node_type
self.host = None
self.use_dedicated_host = vm_spec.use_dedicated_host
self.num_vms_per_host = vm_spec.num_vms_per_host
self.min_cpu_platform = vm_spec.min_cpu_platform
self.threads_per_core = vm_spec.threads_per_core
self.visible_core_count = vm_spec.visible_core_count
self.gce_remote_access_firewall_rule = FLAGS.gce_remote_access_firewall_rule
self.gce_accelerator_type_override = FLAGS.gce_accelerator_type_override
self.gce_tags = vm_spec.gce_tags
self.gce_network_tier = FLAGS.gce_network_tier
self.gce_nic_types = FLAGS.gce_nic_types
self.max_local_disks = vm_spec.num_local_ssds
if (
self.machine_type
and self.machine_type in gce_disk.FIXED_SSD_MACHINE_TYPES
):
self.max_local_disks = gce_disk.FIXED_SSD_MACHINE_TYPES[self.machine_type]
# For certain machine families, we need to explicitly set the GPU type
# and counts. See the _FIXED_GPU_MACHINE_TYPES dictionary for more details.
if self.machine_type and self.machine_type in _FIXED_GPU_MACHINE_TYPES:
self.gpu_type = _FIXED_GPU_MACHINE_TYPES[self.machine_type][0]
self.gpu_count = _FIXED_GPU_MACHINE_TYPES[self.machine_type][1]
for idx, gce_nic_type in enumerate(self.gce_nic_types):
if gce_nic_type == 'GVNIC' and not self.SupportGVNIC():
logging.warning('Changing gce_nic_type to VIRTIO_NET')
self.gce_nic_types[idx] = 'VIRTIO_NET'
self.gce_egress_bandwidth_tier = gcp_flags.EGRESS_BANDWIDTH_TIER.value
self.gce_shielded_secure_boot = FLAGS.gce_shielded_secure_boot
# Default to GCE default (Live Migration)
self.on_host_maintenance = None
# https://cloud.google.com/compute/docs/instances/live-migration#gpusmaintenance
# https://cloud.google.com/compute/docs/instances/define-instance-placement#restrictions
# TODO(pclay): Update if this assertion ever changes
if (
FLAGS['gce_migrate_on_maintenance'].present
and FLAGS.gce_migrate_on_maintenance
and (self.gpu_count or self.network.placement_group)
):
raise errors.Config.InvalidValue(
'Cannot set flag gce_migrate_on_maintenance on instances with GPUs '
'or network placement groups, as it is not supported by GCP.'
)
if (
not FLAGS.gce_migrate_on_maintenance
or self.gpu_count
or self.network.placement_group
or self.preemptible
):
self.on_host_maintenance = 'TERMINATE'
else:
self.on_host_maintenance = 'MIGRATE'
self.automatic_restart = FLAGS.gce_automatic_restart
if self.preemptible:
self.preempt_marker = f'gs://{FLAGS.gcp_preemptible_status_bucket}/{FLAGS.run_uri}/{self.name}'
arm_arch = GetArmArchitecture(self.machine_type)
if arm_arch:
# Assign host_arch to avoid running detect_host on ARM
self.host_arch = arm_arch
self.is_aarch64 = True
self.image_family = vm_spec.image_family or self.GetDefaultImageFamily(
self.is_aarch64
)
self.create_disk_strategy = gce_disk_strategies.GetCreateDiskStrategy(
self, None, 0
)
def _GetNetwork(self):
"""Returns the GceNetwork to use."""
return gce_network.GceNetwork.GetNetwork(self)
@property
def host_list(self):
"""Returns the list of hosts that are compatible with this VM."""
return self.host_map[(self.project, self.zone)]
def _GenerateCreateCommand(self, ssh_keys_path):
"""Generates a command to create the VM instance.
Args:
ssh_keys_path: string. Path to a file containing the sshKeys metadata.
Returns:
GcloudCommand. gcloud command to issue in order to create the VM instance.
"""
args = ['compute', 'instances', 'create', self.name]
cmd = util.GcloudCommand(self, *args)
cmd.flags['async'] = True
if gcp_flags.GCE_CREATE_LOG_HTTP.value:
cmd.flags['log-http'] = True
if gcp_flags.GCE_NODE_GROUP.value:
cmd.flags['node-group'] = gcp_flags.GCE_NODE_GROUP.value
# Compute all flags requiring alpha first. Then if any flags are different
# between alpha and GA, we can set the appropriate ones.
if self.gce_egress_bandwidth_tier:
network_performance_configs = (
f'total-egress-bandwidth-tier={self.gce_egress_bandwidth_tier}'
)
cmd.flags['network-performance-configs'] = network_performance_configs
self.metadata['gce_egress_bandwidth_tier'] = (
self.gce_egress_bandwidth_tier
)
if gcp_flags.GCE_CONFIDENTIAL_COMPUTE.value:
# TODO(pclay): remove when on-host-maintenance gets promoted to GA
cmd.use_alpha_gcloud = True
if gcp_flags.GCE_CONFIDENTIAL_COMPUTE_TYPE.value == 'sev':
cmd.flags.update({'confidential-compute-type': 'SEV'})
cmd.flags.update({'on-host-maintenance': 'TERMINATE'})
elif self.on_host_maintenance:
# TODO(pclay): remove when on-host-maintenance gets promoted to GA
maintenance_flag = 'maintenance-policy'
if cmd.use_alpha_gcloud:
maintenance_flag = 'on-host-maintenance'
cmd.flags[maintenance_flag] = self.on_host_maintenance
if self.network.subnet_resources:
net_resources = self.network.subnet_resources
ni_arg_name = 'subnet'
else:
net_resources = self.network.network_resources
ni_arg_name = 'network'
# Bundle network-related arguments with --network-interface
# This flag is mutually exclusive with any of these flags:
# --address, --network, --network-tier, --subnet, --private-network-ip.
# gcloud compute instances create ... --network-interface=
for idx, net_resource in enumerate(net_resources):
gce_nic_type = self.gce_nic_types[idx].upper()
gce_nic_queue_count_arg = []
if gcp_flags.GCE_NIC_QUEUE_COUNTS.value[idx] != 'default':
gce_nic_queue_count_arg = [
f'queue-count={gcp_flags.GCE_NIC_QUEUE_COUNTS.value[idx]}'
]
no_address_arg = []
if not self.assign_external_ip or idx > 0:
no_address_arg = ['no-address']
cmd.additional_flags += [
'--network-interface',
','.join(
[
f'{ni_arg_name}={net_resource.name}',
f'nic-type={gce_nic_type}',
f'network-tier={self.gce_network_tier.upper()}',
]
+ gce_nic_queue_count_arg
+ no_address_arg
),
]
if self.image:
cmd.flags['image'] = self.image
elif self.image_family:
cmd.flags['image-family'] = self.image_family
self.metadata['image_family'] = self.image_family
if self.image_project is not None:
cmd.flags['image-project'] = self.image_project
self.metadata['image_project'] = self.image_project
for disk_ in self.disks:
cmd.flags.update(disk_.GetCreationCommand())
if self.machine_type is None:
cmd.flags['custom-cpu'] = self.cpus
cmd.flags['custom-memory'] = '{}MiB'.format(self.memory_mib)
else:
cmd.flags['machine-type'] = self.machine_type
if self.min_cpu_platform:
cmd.flags['min-cpu-platform'] = self.min_cpu_platform
# metal instances do not support disable SMT.
if self.threads_per_core and 'metal' not in self.machine_type:
cmd.flags['threads-per-core'] = self.threads_per_core
self.metadata['threads_per_core'] = self.threads_per_core
if self.visible_core_count:
cmd.flags['visible-core-count'] = self.visible_core_count
self.metadata['visible_core_count'] = self.visible_core_count
if self.gpu_count and (
self.cpus
or (
self.machine_type
and self.machine_type not in _FIXED_GPU_MACHINE_TYPES
)
):
cmd.flags['accelerator'] = GenerateAcceleratorSpecString(
self.gpu_type, self.gpu_count
)
cmd.flags['tags'] = ','.join(['perfkitbenchmarker'] + (self.gce_tags or []))
if not self.automatic_restart:
cmd.flags['no-restart-on-failure'] = True
self.metadata['automatic_restart'] = self.automatic_restart
if self.host:
cmd.flags['node-group'] = self.host.name
if self.gce_shielded_secure_boot:
cmd.flags['shielded-secure-boot'] = True
if self.network.placement_group:
self.metadata.update(self.network.placement_group.GetResourceMetadata())
cmd.flags['resource-policies'] = self.network.placement_group.name
else:
self.metadata['placement_group_style'] = (
placement_group.PLACEMENT_GROUP_NONE
)
metadata_from_file = {'sshKeys': ssh_keys_path}
if self.boot_metadata_from_file:
metadata_from_file.update(self.boot_metadata_from_file)
parsed_metadata_from_file = flag_util.ParseKeyValuePairs(
FLAGS.gcp_instance_metadata_from_file
)
for key, value in parsed_metadata_from_file.items():
if key in metadata_from_file:
logging.warning(
(
'Metadata "%s" is set internally. Cannot be overridden '
'from command line.'
),
key,
)
continue
metadata_from_file[key] = value
cmd.flags['metadata-from-file'] = ','.join(
['%s=%s' % (k, v) for k, v in metadata_from_file.items()]
)
# passing sshKeys does not work with OS Login
metadata = {'enable-oslogin': 'FALSE'}
metadata.update(self.boot_metadata)
metadata.update(util.GetDefaultTags())
# Signal (along with timeout_utc) that VM is short lived.
metadata['vm_nature'] = 'ephemeral'
additional_metadata = {}
additional_metadata.update(self.vm_metadata)
additional_metadata.update(
flag_util.ParseKeyValuePairs(FLAGS.gcp_instance_metadata)
)
for key, value in additional_metadata.items():
if key in metadata:
logging.warning(
(
'Metadata "%s" is set internally. Cannot be overridden '
'from command line.'
),
key,
)
continue
metadata[key] = value
if self.preemptible:
cmd.flags['preemptible'] = True
metadata.update([self._PreemptibleMetadataKeyValue()])
cmd.flags['metadata'] = util.FormatTags(metadata)
if (
self.machine_type is None
or self.machine_type not in gce_disk.FIXED_SSD_MACHINE_TYPES
):
# Append the `--local-ssd` args only when it's a customized or old-gen VM.
cmd.flags['local-ssd'] = [
'interface={}'.format(self.ssd_interface)
] * self.max_local_disks
cmd.flags.update(self.create_disk_strategy.GetCreationCommand())
if gcp_flags.GCE_VM_SERVICE_ACCOUNT.value:
cmd.flags['service-account'] = gcp_flags.GCE_VM_SERVICE_ACCOUNT.value
if gcp_flags.GCLOUD_SCOPES.value:
cmd.flags['scopes'] = ','.join(
re.split(r'[,; ]', gcp_flags.GCLOUD_SCOPES.value)
)
if gcp_flags.GCE_PERFORMANCE_MONITORING_UNIT.value:
cmd.flags['performance-monitoring-unit'] = (
gcp_flags.GCE_PERFORMANCE_MONITORING_UNIT.value
)
cmd.flags['labels'] = util.MakeFormattedDefaultTags()
return cmd
def _AddShutdownScript(self):
cmd = util.GcloudCommand(
self, 'compute', 'instances', 'add-metadata', self.name
)
key, value = self._PreemptibleMetadataKeyValue()
cmd.flags['metadata'] = f'{key}={value}'
cmd.Issue()
def _RemoveShutdownScript(self):
# Removes shutdown script which copies status when it is interrupted
cmd = util.GcloudCommand(
self, 'compute', 'instances', 'remove-metadata', self.name
)
key, _ = self._PreemptibleMetadataKeyValue()
cmd.flags['keys'] = key
cmd.Issue(raise_on_failure=False)
def Reboot(self):
if self.preemptible:
self._RemoveShutdownScript()
super().Reboot()
if self.preemptible:
self._AddShutdownScript()
def _Start(self):
"""Starts the VM."""
start_cmd = util.GcloudCommand(
self, 'compute', 'instances', 'start', self.name
)
# After start, IP address is changed
stdout, _, _ = start_cmd.Issue()
response = json.loads(stdout)
# Response is a list of size one
self._ParseDescribeResponse(response[0])
def _Stop(self):
"""Stops the VM."""
stop_cmd = util.GcloudCommand(
self, 'compute', 'instances', 'stop', self.name
)
stop_cmd.Issue()
def _PreDelete(self):
super()._PreDelete()
if self.preemptible:
self._RemoveShutdownScript()
def _Create(self):
"""Create a GCE VM instance."""
stdout, stderr, retcode = self.create_cmd.Issue(
timeout=_GCE_VM_CREATE_TIMEOUT, raise_on_failure=False
)
# Save the create operation name for use in _WaitUntilRunning
if 'name' in stdout:
response = json.loads(stdout)
self.create_operation_name = response[0]['name']
self._ParseCreateErrors(self.create_cmd.rate_limited, stderr, retcode)
if not self.create_return_time:
self.create_return_time = time.time()
def _ParseCreateErrors(
self, cmd_rate_limited: bool, stderr: str, retcode: int
):
"""Parse error messages from a command in order to classify a failure."""
num_hosts = len(self.host_list)
if (
self.use_dedicated_host
and retcode
and _INSUFFICIENT_HOST_CAPACITY in stderr
):
if self.num_vms_per_host:
raise errors.Resource.CreationError(
'Failed to create host: %d vms of type %s per host exceeds '
'memory capacity limits of the host'
% (self.num_vms_per_host, self.machine_type)
)
else:
logging.warning(
'Creation failed due to insufficient host capacity. A new host will'
' be created and instance creation will be retried.'
)
with self._host_lock:
if num_hosts == len(self.host_list):
host = GceSoleTenantNodeGroup(
self.node_type, self.zone, self.project
)
self.host_list.append(host)
host.Create()
self.host = self.host_list[-1]
raise errors.Resource.RetryableCreationError()
if (
not self.use_dedicated_host
and retcode
and _INSUFFICIENT_HOST_CAPACITY in stderr
):
logging.error(util.STOCKOUT_MESSAGE)
raise errors.Benchmarks.InsufficientCapacityCloudFailure(
util.STOCKOUT_MESSAGE
)
util.CheckGcloudResponseKnownFailures(stderr, retcode)
if retcode:
if (
cmd_rate_limited
and 'already exists' in stderr
and FLAGS.retry_on_rate_limited
):
# Gcloud create commands may still create VMs despite being rate
# limited.
return
if util.RATE_LIMITED_MESSAGE in stderr:
raise errors.Benchmarks.QuotaFailure.RateLimitExceededError(stderr)
if self.preemptible and _FAILED_TO_START_DUE_TO_PREEMPTION in stderr:
self.spot_early_termination = True
raise errors.Benchmarks.InsufficientCapacityCloudFailure(
'Interrupted before VM started'
)
if (
re.search(r"subnetworks/\S+' is not ready", stderr)
and gcp_flags.RETRY_GCE_SUBNETWORK_NOT_READY.value
):
# Commonly occurs when simultaneously creating GKE clusters
raise errors.Resource.RetryableCreationError(
f'subnet is currently being updated:\n{stderr}'
)
if "Invalid value for field 'resource.machineType'" in stderr:
raise errors.Benchmarks.UnsupportedConfigError(stderr)
if re.search("The resource '.*' was not found", stderr):
raise errors.Benchmarks.UnsupportedConfigError(stderr)
if 'features are not compatible for creating instance' in stderr:
raise errors.Benchmarks.UnsupportedConfigError(stderr)
if 'Internal error.' in stderr:
raise errors.Resource.CreationInternalError(stderr)
if re.search(
"CPU platform type with name '.*' does not exist in zone", stderr
):
raise errors.Benchmarks.UnsupportedConfigError(stderr)
if re.search(
r'HTTPError 400: .* can not be used without accelerator\(s\) in zone',
stderr,
):
raise errors.Benchmarks.UnsupportedConfigError(stderr)
if 'The service is currently unavailable' in stderr:
raise errors.Benchmarks.KnownIntermittentError(stderr)
# The initial request failed, prompting a retry, but the instance was
# still successfully created, which results in
# '409: The resource X already exists'
if '(gcloud.compute.instances.create) HTTPError 409' in stderr:
raise errors.Benchmarks.KnownIntermittentError(stderr)
# Occurs when creating a large number >30 of VMs in parallel.
if 'gcloud crashed (SSLError)' in stderr:
raise errors.Resource.RetryableCreationError(stderr)
raise errors.Resource.CreationError(
f'Failed to create VM {self.name}:\n{stderr}\nreturn code: {retcode}'
)
def _CreateDependencies(self):
super()._CreateDependencies()
# Create necessary VM access rules *prior* to creating the VM, such that it
# doesn't affect boot time.
self.AllowRemoteAccessPorts()
if self.use_dedicated_host:
with self._host_lock:
if not self.host_list or (
self.num_vms_per_host
and self.host_list[-1].fill_fraction + 1.0 / self.num_vms_per_host
> 1.0
):
host = GceSoleTenantNodeGroup(self.node_type, self.zone, self.project)
self.host_list.append(host)
if gcp_flags.GCE_NODE_GROUP.value is None:
# GCE_NODE_GROUP is used to identify an existing node group.
host.Create()
self.host = self.host_list[-1]
if self.num_vms_per_host:
self.host.fill_fraction += 1.0 / self.num_vms_per_host
# Capture the public key, write it to a temp file, and save the filename.
with open(self.ssh_public_key) as f:
ssh_public_key = f.read().rstrip('\n')
with vm_util.NamedTemporaryFile(
mode='w', dir=vm_util.GetTempDir(), prefix='key-metadata', delete=False
) as tf:
tf.write('%s:%s\n' % (self.user_name, ssh_public_key))
tf.close()
self.create_cmd = self._GenerateCreateCommand(tf.name)
def _DeleteDependencies(self):
if self.host:
with self._host_lock:
if self.host in self.host_list:
self.host_list.remove(self.host)
if self.host not in self.deleted_hosts:
self.host.Delete()
self.deleted_hosts.add(self.host)
def _ParseDescribeResponse(self, describe_response):
"""Sets the ID and IP addresses from a response to the describe command.
Args:
describe_response: JSON-loaded response to the describe gcloud command.
Raises:
KeyError, IndexError: If the ID and IP addresses cannot be parsed.
"""
self.id = describe_response['id']
network_interface = describe_response['networkInterfaces'][0]
self.internal_ip = network_interface['networkIP']
if 'accessConfigs' in network_interface:
self.ip_address = network_interface['accessConfigs'][0]['natIP']
for network_interface in describe_response['networkInterfaces']:
self.internal_ips.append(network_interface['networkIP'])
@property
def HasIpAddress(self):
"""Returns True when the IP has been retrieved from a describe response."""
return not self._NeedsToParseDescribeResponse()
def _NeedsToParseDescribeResponse(self):
"""Returns whether the ID and IP addresses still need to be set."""
return (
not self.id
or not self.GetInternalIPs()
or (self.assign_external_ip and not self.ip_address)
)
@vm_util.Retry()
def _PostCreate(self):
"""Get the instance's data."""
if self._NeedsToParseDescribeResponse():
getinstance_cmd = util.GcloudCommand(
self, 'compute', 'instances', 'describe', self.name
)
stdout, _, _ = getinstance_cmd.Issue()
response = json.loads(stdout)
self._ParseDescribeResponse(response)
for disk_ in self.disks:
disk_.PostCreate()
self.image = self.boot_disk.image
def _Delete(self):
"""Delete a GCE VM instance."""
delete_cmd = util.GcloudCommand(
self, 'compute', 'instances', 'delete', self.name
)
delete_cmd.Issue(raise_on_failure=False)
def _Suspend(self):
"""Suspend a GCE VM instance."""
util.GcloudCommand(
self, 'beta', 'compute', 'instances', 'suspend', self.name
).Issue()
def _Resume(self):
"""Resume a GCE VM instance."""
resume_cmd = util.GcloudCommand(
self, 'beta', 'compute', 'instances', 'resume', self.name
)
# After resume, IP address is refreshed
stdout, _, _ = resume_cmd.Issue()
response = json.loads(stdout)
# Response is a list of size one
self._ParseDescribeResponse(response[0])
@vm_util.Retry(
poll_interval=1,
log_errors=False,
retryable_exceptions=(GceServiceUnavailableError,),
)
def _Exists(self):
"""Returns true if the VM exists."""
getinstance_cmd = util.GcloudCommand(
self, 'compute', 'instances', 'describe', self.name
)
stdout, stderr, retcode = getinstance_cmd.Issue(raise_on_failure=False)
if 'The service is currently unavailable' in stderr:
logging.info('instances describe command failed, retrying.')
raise GceServiceUnavailableError()
elif retcode and re.search(r"The resource \'.*'\ was not found", stderr):
return False
response = json.loads(stdout)
status = response['status']
return status in INSTANCE_EXISTS_STATUSES
@vm_util.Retry(
poll_interval=1,
log_errors=False,
retryable_exceptions=(GceRetryDescribeOperationsError,),
)
def _WaitUntilRunning(self):
"""Waits until the VM instances create command completes."""
getoperation_cmd = util.GcloudCommand(
self, 'compute', 'operations', 'describe', self.create_operation_name
)
if gcp_flags.GCE_CREATE_LOG_HTTP.value:
getoperation_cmd.flags['log-http'] = True
stdout, _, retcode = getoperation_cmd.Issue(raise_on_failure=False)
if retcode != 0:
logging.info('operations describe command failed, retrying.')
raise GceRetryDescribeOperationsError()
response = json.loads(stdout)
status = response['status']
# Classify errors once the operation is complete.
if 'error' in response:
create_stderr = json.dumps(response['error'])
create_retcode = 1
self._ParseCreateErrors(
getoperation_cmd.rate_limited, create_stderr, create_retcode
)
# Retry if the operation is not yet DONE.
elif status != OPERATION_DONE:
logging.info(
'VM create operation has status %s; retrying operations '
'describe command.',
status,
)
raise GceRetryDescribeOperationsError()
# Collect the time-to-running timestamp once the operation completes.
elif not self.is_running_time:
self.is_running_time = time.time()
def SetDiskSpec(self, disk_spec, disk_count):
"""Sets Disk Specs of the current VM. Calls before the VM is created."""
self.create_disk_strategy = gce_disk_strategies.GetCreateDiskStrategy(
self, disk_spec, disk_count
)
def SetupAllScratchDisks(self):
"""Set up all scratch disks of the current VM."""
# Prepare vm scratch disks:
self.create_disk_strategy.GetSetupDiskStrategy().SetUpDisk()
def ReleaseIpReservation(self, ip_address_name: str) -> None:
"""Releases existing IP reservation.
Args:
ip_address_name: name of the IP reservation to be released.
"""
reserv_ip_address = gce_network.GceIPAddress(
self.project,
util.GetRegionFromZone(self.zone),
ip_address_name,
self.network.primary_subnet_name,
)
reserv_ip_address.Delete()
def AddMetadata(self, **kwargs):
"""Adds metadata to disk."""
# vm metadata added to vm on creation.
# Add metadata to boot disk
gce_disk.AddLabels(self, self.name)
self.create_disk_strategy.AddMetadataToDiskResource()
def AllowRemoteAccessPorts(self):
"""Creates firewall rules for remote access if required."""
# If gce_remote_access_firewall_rule is specified, access is already
# granted by that rule.
# If not, GCE firewall rules are created for all instances in a
# network.
if not self.gce_remote_access_firewall_rule:
super().AllowRemoteAccessPorts()
def GetResourceMetadata(self):
"""Returns a dict containing metadata about the VM.
Returns:
dict mapping string property key to value.
"""
result = super().GetResourceMetadata()
for attr_name in 'cpus', 'memory_mib', 'preemptible', 'project':
attr_value = getattr(self, attr_name)
if attr_value:
result[attr_name] = attr_value
if self.use_dedicated_host:
result['node_type'] = self.node_type
result['num_vms_per_host'] = self.num_vms_per_host
if self.gpu_count:
result['gpu_type'] = self.gpu_type
result['gpu_count'] = self.gpu_count
if self.gce_accelerator_type_override:
result['accelerator_type_override'] = self.gce_accelerator_type_override
if self.gce_tags:
result['gce_tags'] = ','.join(self.gce_tags)
if self.max_local_disks:
result['gce_local_ssd_count'] = self.max_local_disks
result['gce_local_ssd_interface'] = self.ssd_interface
# self.network.network_resources can be None when subnet_names are populated
network_resources = (
self.network.network_resources or self.network.subnet_resources
)
result['gce_network_name'] = ','.join(
network_resource.name for network_resource in network_resources
)
result['gce_subnet_name'] = ','.join(
subnet_resource.name
for subnet_resource in self.network.subnet_resources
)
result['gce_network_tier'] = self.gce_network_tier
result['gce_nic_type'] = self.gce_nic_types
result['gce_shielded_secure_boot'] = self.gce_shielded_secure_boot
if self.visible_core_count:
result['visible_core_count'] = self.visible_core_count
if self.network.mtu:
result['mtu'] = self.network.mtu
if gcp_flags.GCE_CONFIDENTIAL_COMPUTE.value:
result['confidential_compute'] = True
result['confidential_compute_type'] = (
gcp_flags.GCE_CONFIDENTIAL_COMPUTE_TYPE.value
)
for disk_ in self.disks:
result.update(disk_.GetResourceMetadata())
return result
def SimulateMaintenanceWithLog(self):
"""Create a json file with information related to the vm."""
simulate_maintenance_json = {
'current_time': datetime.datetime.now().timestamp() * 1000,
'instance_id': self.id,
'project': self.project,
'instance_name': self.name,
'zone': self.zone,
}
vm_path = posixpath.join(vm_util.GetTempDir(), self._LM_SIGNAL_LOG)
with open(vm_path, 'w+') as f:
json.dump(simulate_maintenance_json, f, indent=2, sort_keys=True)
def SimulateMaintenanceEvent(self):
"""Simulates a maintenance event on the VM."""
cmd = util.GcloudCommand(
self,
'compute',
'instances',
'simulate-maintenance-event',
self.name,
'--async',
)
logcmd = util.GcloudCommand(
None,
'logging',
'read',
'"protoPayload.methodName=v1.compute.instances.simulateMaintenanceEvent'
f' resource.labels.instance_id={self.id}"',
)
logcmd.flags['freshness'] = f'{LM_UNAVAILABLE_STATUS_WAIT_TIME_MIN}M'
stdout, _, retcode = cmd.Issue(raise_on_failure=False)
if retcode or 'error' in stdout:
raise errors.VirtualMachine.VirtualMachineError(
'Unable to simulate maintenance event.'
)
time.sleep(LM_UNAVAILABLE_STATUS_WAIT_TIME_MIN * 60)
stdout, _, retcode = logcmd.Issue(raise_on_failure=False)
# if the migration is temporarily unavailable, retry the migration command
if not retcode and 'MIGRATION_TEMPORARILY_UNAVAILABLE' in stdout:
stdout, _, retcode = cmd.Issue(raise_on_failure=False)
if retcode or 'error' in stdout:
raise errors.VirtualMachine.VirtualMachineError(
'Unable to simulate maintenance event.'
)
def SetupLMNotification(self):
"""Prepare environment for /scripts/gce_maintenance_notify.py script."""
self.Install('pip')
self.RemoteCommand('sudo pip3 install requests')
self.PushDataFile(self._LM_NOTICE_SCRIPT, vm_util.VM_TMP_DIR)
def _GetLMNotificationCommand(self):
"""Return Remote python execution command for LM notify script."""
vm_path = posixpath.join(vm_util.VM_TMP_DIR, self._LM_NOTICE_SCRIPT)
server_log = self._LM_NOTICE_LOG
return (
f'python3 {vm_path} {gcp_flags.LM_NOTIFICATION_METADATA_NAME.value} >'
f' {server_log} 2>&1'
)
def _PullLMNoticeLog(self):
"""Pull the LM Notice Log onto the local VM."""
self.PullFile(vm_util.GetTempDir(), self._LM_NOTICE_LOG)
def StartLMNotification(self):
"""Start meta-data server notification subscription."""
def _Subscribe():
self.RemoteCommand(
self._GetLMNotificationCommand(),
timeout=LM_NOTIFICATION_TIMEOUT_SECONDS,
ignore_failure=True,
)
self._PullLMNoticeLog()
logging.info('[LM Notify] Release live migration lock.')
self._LM_TIMES_SEMAPHORE.release()
logging.info('[LM Notify] Create live migration timestamp thread.')
t = threading.Thread(target=_Subscribe)
t.daemon = True
t.start()
def WaitLMNotificationRelease(self):
"""Block main thread until LM ended."""
logging.info('[LM Notify] Wait for live migration to finish.')
self._LM_TIMES_SEMAPHORE.acquire()
logging.info('[LM Notify] Live migration is done.')
def _ReadLMNoticeContents(self):
"""Read the contents of the LM Notice Log into a string."""
return self.RemoteCommand(f'cat {self._LM_NOTICE_LOG}')[0]
def CollectLMNotificationsTime(self):
"""Extract LM notifications from log file.
Sample Log file to parse:
Host_maintenance_start _at_ 1656555520.78123
Host_maintenance_end _at_ 1656557227.63631
Returns:
Live migration events timing info dictionary
"""
lm_total_time_key = 'LM_total_time'
lm_start_time_key = 'Host_maintenance_start'
lm_end_time_key = 'Host_maintenance_end'
events_dict = {
'machine_instance': self.instance_number,
lm_start_time_key: 0,
lm_end_time_key: 0,
lm_total_time_key: 0,
}
lm_times = self._ReadLMNoticeContents()
if not lm_times:
raise ValueError('Cannot collect lm times. Live Migration might failed.')
# Result may contain errors captured, so we need to skip them
for event_info in lm_times.splitlines():
event_info_parts = event_info.split(' _at_ ')
if len(event_info_parts) == 2:
events_dict[event_info_parts[0]] = event_info_parts[1]
events_dict[lm_total_time_key] = float(
events_dict[lm_end_time_key]
) - float(events_dict[lm_start_time_key])
return events_dict
def DownloadPreprovisionedData(
self,
install_path,
module_name,
filename,
timeout=virtual_machine.PREPROVISIONED_DATA_TIMEOUT,
):
"""Downloads a data file from a GCS bucket with pre-provisioned data.
Use --gce_preprovisioned_data_bucket to specify the name of the bucket.
Args:
install_path: The install path on this VM.
module_name: Name of the module associated with this data file.
filename: The name of the file that was downloaded.
timeout: Timeout value for downloading preprovisionedData, Five minutes by
default.
"""
# TODO(deitz): Add retry logic.
self.RobustRemoteCommand(
GenerateDownloadPreprovisionedDataCommand(
install_path, module_name, filename
),
timeout=timeout,
)
def InstallCli(self):
"""Installs the gcloud cli on this GCP vm."""
self.Install('google_cloud_sdk')
def ShouldDownloadPreprovisionedData(self, module_name, filename):
"""Returns whether or not preprovisioned data is available."""
logging.info('Testing if preprovisioned data is available %s', filename)
return FLAGS.gcp_preprovisioned_data_bucket and self.TryRemoteCommand(
GenerateStatPreprovisionedDataCommand(module_name, filename)
)
def _UpdateInterruptibleVmStatusThroughMetadataService(self):
_, _, retcode = vm_util.IssueCommand(
[FLAGS.gsutil_path, 'stat', self.preempt_marker], raise_on_failure=False
)
# The VM is preempted if the command exits without an error
self.spot_early_termination = not bool(retcode)
if self.WasInterrupted():
return
stdout, _ = self.RemoteCommand(self._MetadataPreemptCmd)
self.spot_early_termination = stdout.strip().lower() == 'true'
@property
def _MetadataPreemptCmd(self):
return _METADATA_PREEMPT_CMD
def _PreemptibleMetadataKeyValue(self) -> Tuple[str, str]:
"""See base class."""
return 'shutdown-script', _SHUTDOWN_SCRIPT.format(
preempt_marker=self.preempt_marker, user=self.user_name
)
def _AcquireWritePermissionsLinux(self):
gcs.GoogleCloudStorageService.AcquireWritePermissionsLinux(self)
def OnStartup(self):
super().OnStartup()
if self.preemptible:
# Prepare VM to use GCS. When an instance is interrupt, the shutdown
# script will copy the status a GCS bucket.
self._AcquireWritePermissionsLinux()
def _UpdateInterruptibleVmStatusThroughApi(self):
# If the run has failed then do a check that could throw an exception.
vm_without_zone = copy.copy(self)
vm_without_zone.zone = None
gcloud_command = util.GcloudCommand(
vm_without_zone, 'compute', 'operations', 'list'
)
gcloud_command.flags['filter'] = f'targetLink.scope():{self.name}'
gcloud_command.flags['zones'] = self.zone
stdout, _, _ = gcloud_command.Issue()
self.spot_early_termination = any(
operation['operationType'] == 'compute.instances.preempted'
for operation in json.loads(stdout)
)
def UpdateInterruptibleVmStatus(self, use_api=False):
"""Updates the interruptible status if the VM was preempted."""
if not self.IsInterruptible():
return
if self.WasInterrupted():
return
try:
self._UpdateInterruptibleVmStatusThroughMetadataService()
except errors.VirtualMachine.RemoteCommandError as error:
if use_api and 'connection timed out' in str(error).lower():
self._UpdateInterruptibleVmStatusThroughApi()
def IsInterruptible(self):
"""Returns whether this vm is an interruptible vm (spot vm).
Returns: True if this vm is an interruptible vm (spot vm).
"""
return self.preemptible
def WasInterrupted(self):
"""Returns whether this spot vm was terminated early by GCP.
Returns: True if this vm was terminated early by GCP.
"""
return self.spot_early_termination
def GetVmStatusCode(self):
"""Returns the early termination code if any.
Returns: Early termination code.
"""
return self.preemptible_status_code
def GetInterruptableStatusPollSeconds(self):
"""Get seconds between preemptible status polls.
Returns:
Seconds between polls
"""
return 3600
def SupportGVNIC(self) -> bool:
return True
def GetDefaultImageFamily(self, is_arm: bool) -> str | None:
return None
def GetDefaultImageProject(self) -> str | None:
return None
def GetNumTeardownSkippedVms(self) -> int:
"""Returns the number of lingering VMs in this VM's project and zone."""
# compute instances list doesn't accept a --zone flag, so we need to drop
# the zone from the VM object and pass in --zones instead.
vm_without_zone = copy.copy(self)
vm_without_zone.zone = None
args = ['compute', 'instances', 'list']
cmd = util.GcloudCommand(vm_without_zone, *args)
cmd.flags['format'] = 'json'
cmd.flags['zones'] = self.zone
stdout, _, _ = cmd.Issue()
all_vms = json.loads(stdout)
num_teardown_skipped_vms = 0
for vm_json in all_vms:
for item in vm_json['metadata']['items']:
if (
item['key'] == PKB_SKIPPED_TEARDOWN_METADATA_KEY
and item['value'] == 'true'
):
num_teardown_skipped_vms += 1
continue
return num_teardown_skipped_vms
def UpdateTimeoutMetadata(self):
"""Updates the timeout metadata for the VM."""
new_timeout = datetime.datetime.now(datetime.UTC) + datetime.timedelta(
minutes=pkb_flags.SKIP_TEARDOWN_KEEP_UP_MINUTES.value
)
new_timeout = new_timeout.strftime(resource.METADATA_TIME_FORMAT)
args = ['compute', 'instances', 'add-metadata', self.name]
cmd = util.GcloudCommand(self, *args)
cmd.flags['metadata'] = (
f'{resource.TIMEOUT_METADATA_KEY}={new_timeout},'
f'{PKB_SKIPPED_TEARDOWN_METADATA_KEY}=true'
)
cmd.Issue()
class BaseLinuxGceVirtualMachine(GceVirtualMachine, linux_vm.BaseLinuxMixin):
"""Class supporting Linux GCE virtual machines.
Currently looks for gVNIC capabilities.
TODO(pclay): Make more generic and move to BaseLinuxMixin.
"""
# ethtool properties output should match this regex
_ETHTOOL_RE = re.compile(r'^(?P<key>.*?):\s*(?P<value>.*)\s*')
# the "device" value in ethtool properties for gvnic
_GVNIC_DEVICE_NAME = 'gve'
# Subclasses should override the default image OR
# both the image family and image_project.
DEFAULT_X86_IMAGE_FAMILY = None
DEFAULT_ARM_IMAGE_FAMILY = None
DEFAULT_IMAGE_PROJECT = None
SUPPORTS_GVNIC = True
def __init__(self, vm_spec):
super().__init__(vm_spec)
self._gvnic_version = None
def GetResourceMetadata(self):
"""See base class."""
metadata = super().GetResourceMetadata().copy()
if self._gvnic_version:
metadata['gvnic_version'] = self._gvnic_version
return metadata
def OnStartup(self):
"""See base class. Sets the _gvnic_version."""
super().OnStartup()
self._gvnic_version = self.GetGvnicVersion()
def GetGvnicVersion(self) -> str | None:
"""Returns the gvnic network driver version."""
if not gcp_flags.GCE_NIC_RECORD_VERSION.value:
return
all_device_properties = {}
for device_name in self._get_network_device_mtus():
device = self._GetNetworkDeviceProperties(device_name)
all_device_properties[device_name] = device
driver = device.get('driver')
driver_version = device.get('version')
if not driver:
logging.error(
'Network device %s lacks a driver %s', device_name, device
)
elif driver == self._GVNIC_DEVICE_NAME:
logging.info('gvnic properties %s', device)
if driver_version:
return driver_version
raise ValueError(f'No version in {device}')
def _GetNetworkDeviceProperties(self, device_name: str) -> Dict[str, str]:
"""Returns a dict of the network device properties."""
# ethtool can exist under /usr/sbin or needs to be installed (debian9)
if self.HasPackage('ethtool'):
self.InstallPackages('ethtool')
try:
stdout, _ = self.RemoteCommand(
f'PATH="${{PATH}}":/usr/sbin ethtool -i {device_name}'
)
except errors.VirtualMachine.RemoteCommandError:
logging.info('ethtool not installed', exc_info=True)
return {}
properties = {}
for line in stdout.splitlines():
m = self._ETHTOOL_RE.match(line)
if m:
properties[m['key']] = m['value']
return properties
def SupportGVNIC(self) -> bool:
return self.SUPPORTS_GVNIC
# Use an explicit is_arm parameter to not accidentally assume a default
def GetDefaultImageFamily(self, is_arm: bool) -> str:
if is_arm:
if self.DEFAULT_ARM_IMAGE_FAMILY:
return self.DEFAULT_ARM_IMAGE_FAMILY
assert 'arm64' not in self.DEFAULT_X86_IMAGE_FAMILY
if 'amd64' in self.DEFAULT_X86_IMAGE_FAMILY:
# New convention as of Ubuntu 23
arm_image_family = self.DEFAULT_X86_IMAGE_FAMILY.replace(
'amd64', 'arm64'
)
else:
# Older convention
arm_image_family = self.DEFAULT_X86_IMAGE_FAMILY + '-arm64'
logging.info(
'ARM image must be used; changing image to %s',
arm_image_family,
)
return arm_image_family
if not self.DEFAULT_X86_IMAGE_FAMILY:
raise ValueError(
'DEFAULT_X86_IMAGE_FAMILY can not be None for non-ARM vms.'
)
return self.DEFAULT_X86_IMAGE_FAMILY
def GetDefaultImageProject(self) -> str:
if not self.DEFAULT_IMAGE_PROJECT:
raise ValueError('DEFAULT_IMAGE_PROJECT can not be None')
return self.DEFAULT_IMAGE_PROJECT
def GenerateAndCaptureSerialPortOutput(self, local_path: str) -> bool:
"""Generates and captures the serial port output for the remote VM.
Args:
local_path: The path to store the serial port output on the caller's
machine.
Returns:
True if the serial port output was successfully generated and captured;
False otherwise.
"""
cmd = util.GcloudCommand(
self, 'compute', 'instances', 'get-serial-port-output', self.name,
)
cmd.flags['zone'] = self.zone
cmd.flags['port'] = 1
stdout, _, retcode = cmd.Issue(raise_on_failure=False)
if retcode != 0:
logging.error('Failed to get serial port 1 output')
return False
with open(local_path, 'w') as f:
f.write(stdout)
return True
class Debian11BasedGceVirtualMachine(
BaseLinuxGceVirtualMachine,
linux_vm.Debian11Mixin,
):
DEFAULT_X86_IMAGE_FAMILY = 'debian-11'
DEFAULT_IMAGE_PROJECT = 'debian-cloud'
@property
def DEFAULT_ARM_IMAGE_FAMILY(self):
raise errors.Config.InvalidValue(
'GCE does not support Debian 11 on ARM during LTS.'
)
class Debian12BasedGceVirtualMachine(
BaseLinuxGceVirtualMachine, linux_vm.Debian12Mixin
):
DEFAULT_X86_IMAGE_FAMILY = 'debian-12'
DEFAULT_IMAGE_PROJECT = 'debian-cloud'
class Rhel8BasedGceVirtualMachine(
BaseLinuxGceVirtualMachine, linux_vm.Rhel8Mixin
):
DEFAULT_X86_IMAGE_FAMILY = 'rhel-8'
DEFAULT_IMAGE_PROJECT = 'rhel-cloud'
class Rhel9BasedGceVirtualMachine(
BaseLinuxGceVirtualMachine, linux_vm.Rhel9Mixin
):
DEFAULT_X86_IMAGE_FAMILY = 'rhel-9'
DEFAULT_IMAGE_PROJECT = 'rhel-cloud'
class RockyLinux8BasedGceVirtualMachine(
BaseLinuxGceVirtualMachine, linux_vm.RockyLinux8Mixin
):
DEFAULT_X86_IMAGE_FAMILY = 'rocky-linux-8'
DEFAULT_IMAGE_PROJECT = 'rocky-linux-cloud'
# https://cloud.google.com/blog/products/application-modernization/introducing-rocky-linux-optimized-for-google-cloud
class RockyLinux8OptimizedBasedGceVirtualMachine(
RockyLinux8BasedGceVirtualMachine
):
OS_TYPE = os_types.ROCKY_LINUX8_OPTIMIZED
DEFAULT_X86_IMAGE_FAMILY = 'rocky-linux-8-optimized-gcp'
class RockyLinux9BasedGceVirtualMachine(
BaseLinuxGceVirtualMachine, linux_vm.RockyLinux9Mixin
):
DEFAULT_X86_IMAGE_FAMILY = 'rocky-linux-9'
DEFAULT_IMAGE_PROJECT = 'rocky-linux-cloud'
class RockyLinux9OptimizedBasedGceVirtualMachine(
RockyLinux9BasedGceVirtualMachine
):
OS_TYPE = os_types.ROCKY_LINUX9_OPTIMIZED
DEFAULT_X86_IMAGE_FAMILY = 'rocky-linux-9-optimized-gcp'
class CentOsStream9BasedGceVirtualMachine(
BaseLinuxGceVirtualMachine, linux_vm.CentOsStream9Mixin
):
DEFAULT_X86_IMAGE_FAMILY = 'centos-stream-9'
DEFAULT_IMAGE_PROJECT = 'centos-cloud'
class BaseCosBasedGceVirtualMachine(
BaseLinuxGceVirtualMachine, linux_vm.BaseContainerLinuxMixin
):
"""Base class for COS-based GCE virtual machines."""
BASE_OS_TYPE = os_types.CORE_OS
DEFAULT_IMAGE_PROJECT = 'cos-cloud'
def PrepareVMEnvironment(self):
super().PrepareVMEnvironment()
# COS mounts /home and /tmp with -o noexec, which blocks running benchmark
# binaries.
# TODO(user): Support reboots
self.RemoteCommand('sudo mount -o remount,exec /home')
self.RemoteCommand('sudo mount -o remount,exec /tmp')
class CosStableBasedGceVirtualMachine(BaseCosBasedGceVirtualMachine):
OS_TYPE = os_types.COS
DEFAULT_X86_IMAGE_FAMILY = 'cos-stable'
DEFAULT_ARM_IMAGE_FAMILY = 'cos-arm64-stable'
class CosDevBasedGceVirtualMachine(BaseCosBasedGceVirtualMachine):
OS_TYPE = os_types.COS_DEV
DEFAULT_X86_IMAGE_FAMILY = 'cos-dev'
DEFAULT_ARM_IMAGE_FAMILY = 'cos-arm64-dev'
class Cos117BasedGceVirtualMachine(BaseCosBasedGceVirtualMachine):
OS_TYPE = os_types.COS117
DEFAULT_X86_IMAGE_FAMILY = 'cos-117-lts'
DEFAULT_ARM_IMAGE_FAMILY = 'cos-arm64-117-lts'
class Cos113BasedGceVirtualMachine(BaseCosBasedGceVirtualMachine):
OS_TYPE = os_types.COS113
DEFAULT_X86_IMAGE_FAMILY = 'cos-113-lts'
DEFAULT_ARM_IMAGE_FAMILY = 'cos-arm64-113-lts'
class Cos109BasedGceVirtualMachine(BaseCosBasedGceVirtualMachine):
OS_TYPE = os_types.COS109
DEFAULT_X86_IMAGE_FAMILY = 'cos-109-lts'
DEFAULT_ARM_IMAGE_FAMILY = 'cos-arm64-109-lts'
class Cos105BasedGceVirtualMachine(BaseCosBasedGceVirtualMachine):
OS_TYPE = os_types.COS105
DEFAULT_X86_IMAGE_FAMILY = 'cos-105-lts'
DEFAULT_ARM_IMAGE_FAMILY = 'cos-arm64-105-lts'
class CoreOsBasedGceVirtualMachine(
BaseLinuxGceVirtualMachine, linux_vm.CoreOsMixin
):
DEFAULT_X86_IMAGE_FAMILY = 'fedora-coreos-stable'
DEFAULT_IMAGE_PROJECT = 'fedora-coreos-cloud'
SUPPORTS_GVNIC = False
def __init__(self, vm_spec):
super().__init__(vm_spec)
# Fedora CoreOS only creates the core user
self.user_name = 'core'
class Ubuntu2004BasedGceVirtualMachine(
BaseLinuxGceVirtualMachine, linux_vm.Ubuntu2004Mixin
):
DEFAULT_X86_IMAGE_FAMILY = 'ubuntu-2004-lts'
DEFAULT_IMAGE_PROJECT = 'ubuntu-os-cloud'
class Ubuntu2204BasedGceVirtualMachine(
BaseLinuxGceVirtualMachine, linux_vm.Ubuntu2204Mixin
):
DEFAULT_X86_IMAGE_FAMILY = 'ubuntu-2204-lts'
DEFAULT_IMAGE_PROJECT = 'ubuntu-os-cloud'
class Ubuntu2404BasedGceVirtualMachine(
BaseLinuxGceVirtualMachine, linux_vm.Ubuntu2404Mixin
):
DEFAULT_X86_IMAGE_FAMILY = 'ubuntu-2404-lts-amd64'
DEFAULT_IMAGE_PROJECT = 'ubuntu-os-cloud'
class Debian12DeepLearningBasedGceVirtualMachine(
BaseLinuxGceVirtualMachine, linux_vm.Debian12DLMixin
):
"""Debian12 based Deeplearning image."""
# This is an self-made image that follow instructions from
# https://github.com/GoogleCloudPlatform/cluster-toolkit/blob/main/examples/machine-learning/a3-megagpu-8g/slurm-a3mega-image.yaml
DEFAULT_IMAGE_PROJECT = ''
DEFAULT_X86_IMAGE_FAMILY = 'slurm-a3mega'
def __init__(self, vm_spec):
"""Initialize a Debian12 Base DLVM virtual machine.
Args:
vm_spec: virtual_machine.BaseVirtualMachineSpec object of the vm.
Raises:
ValueError: If an incompatible vm_spec is passed.
"""
super().__init__(vm_spec)
self._installed_packages.add('slurm')
self._installed_packages.add('cuda_toolkit')
def PrepareVMEnvironment(self):
super().PrepareVMEnvironment()
self.Install('tcpxo')
self.RemoteCommand('sudo mkdir -p /etc/slurm/', ignore_failure=True)
self.RemoteCommand(
'echo "cgroupPlugin=cgroup/v2" >> cgroup.conf; '
'echo "ConstrainCores=no" >> cgroup.conf; '
'echo "ConstrainRAMSpace=no" >> cgroup.conf; '
)
self.RemoteCommand('sudo mv cgroup.conf /etc/slurm/')
def GenerateDownloadPreprovisionedDataCommand(
install_path: str, module_name: str, filename: str
) -> str:
"""Returns a string used to download preprovisioned data.
Args:
install_path: Path to install the module.
module_name: Name of the module to download.
filename: Filename of the module. Usually a zip file.
Returns:
The gcloud command to run.
"""
return 'gcloud storage -q cp gs://%s/%s/%s %s' % (
FLAGS.gcp_preprovisioned_data_bucket,
module_name,
filename,
posixpath.join(install_path, filename),
)
def GenerateStatPreprovisionedDataCommand(
module_name: str, filename: str
) -> str:
"""Returns a string used to download preprovisioned data.
Args:
module_name: Name of the module to download.
filename: Filename of the module. Usually a zip file.
Returns:
The gcloud command to run.
"""
return 'gcloud storage ls gs://%s/%s/%s' % (
FLAGS.gcp_preprovisioned_data_bucket,
module_name,
filename,
)