perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py (397 lines of code) (raw):
# Copyright 2018 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Contains classes/functions related to GKE (Google Kubernetes Engine)."""
import json
import logging
import math
import os
import re
import typing
from typing import Any
from absl import flags
from perfkitbenchmarker import container_service
from perfkitbenchmarker import data
from perfkitbenchmarker import errors
from perfkitbenchmarker import kubernetes_helper
from perfkitbenchmarker import provider_info
from perfkitbenchmarker import virtual_machine
from perfkitbenchmarker.configs import container_spec as container_spec_lib
from perfkitbenchmarker.providers.gcp import flags as gcp_flags
from perfkitbenchmarker.providers.gcp import gce_disk
from perfkitbenchmarker.providers.gcp import gce_virtual_machine
from perfkitbenchmarker.providers.gcp import util
FLAGS = flags.FLAGS
NVIDIA_DRIVER_SETUP_DAEMON_SET_SCRIPT = 'https://raw.githubusercontent.com/GoogleCloudPlatform/container-engine-accelerators/master/nvidia-driver-installer/cos/daemonset-preloaded.yaml'
NVIDIA_UNRESTRICTED_PERMISSIONS_DAEMON_SET = (
'nvidia_unrestricted_permissions_daemonset.yml'
)
SERVICE_ACCOUNT_PATTERN = r'.*((?<!iam)|{project}.iam).gserviceaccount.com'
ONE_HOUR = 60 * 60
def _CalculateCidrSize(nodes: int) -> int:
# Defaults are used for pod and services CIDR ranges:
# https://cloud.google.com/kubernetes-engine/docs/concepts/alias-ips#cluster_sizing_secondary_range_svcs)
# Each node requires a /24 CIDR range for pods
# The cluster requires a /20 CIDR range for services
# So 2^(32 - nodes) - 2^(32 - 20) >= 2^(32 - 24) * CIDR
# OR CIDR <= 32 - log2(2^8 * nodes + 2^12)
cidr_size = int(32 - math.log2((nodes << 8) + (1 << 12)))
# /19 is narrowest CIDR range GKE supports
return min(cidr_size, 19)
class GoogleArtifactRegistry(container_service.BaseContainerRegistry):
"""Class for building/storing container images on GCP w/ Artifact Registry."""
CLOUD = provider_info.GCP
def __init__(self, registry_spec):
super().__init__(registry_spec)
self.project = self.project or util.GetDefaultProject()
self.region = util.GetRegionFromZone(self.zone)
# Remove from gcloud commands
self.zone = None
self.endpoint = f'{self.region}-docker.pkg.dev'
def GetFullRegistryTag(self, image: str) -> str:
"""Gets the full tag of the image."""
project = self.project.replace(':', '/')
full_tag = f'{self.endpoint}/{project}/{self.name}/{image}'
return full_tag
def Login(self):
"""Configures docker to be able to push to remote repo."""
util.GcloudCommand(self, 'auth', 'configure-docker', self.endpoint).Issue()
def _Create(self):
util.GcloudCommand(
self,
'artifacts',
'repositories',
'create',
self.name,
'--repository-format=docker',
f'--location={self.region}',
).Issue()
def _Delete(self):
util.GcloudCommand(
self,
'artifacts',
'repositories',
'delete',
self.name,
f'--location={self.region}',
).Issue()
def RemoteBuild(self, image: container_service.ContainerImage):
"""Builds the image remotely."""
if not gcp_flags.CONTAINER_REMOTE_BUILD_CONFIG.value:
full_tag = self.GetFullRegistryTag(image.name)
else:
full_tag = gcp_flags.CONTAINER_REMOTE_BUILD_CONFIG.value
build_cmd = util.GcloudCommand(
self, 'builds', 'submit', '--tag', full_tag, image.directory
)
build_cmd.Issue(timeout=None)
class BaseGkeCluster(container_service.KubernetesCluster):
"""Base class for regular & Autopilot GKE clusters."""
def __init__(self, spec: container_spec_lib.ContainerClusterSpec):
super().__init__(spec)
self.project: str = spec.vm_spec.project or FLAGS.project
self.cluster_version: str = FLAGS.container_cluster_version
self.release_channel: str | None = gcp_flags.CONTAINER_RELEASE_CHANNEL.value
self.use_application_default_credentials: bool = True
self.zones = (
self.default_nodepool.zone and self.default_nodepool.zone.split(',')
)
if not self.zones:
raise errors.Config.MissingOption(
'container_cluster.vm_spec.GCP.zone is required.'
)
elif len(self.zones) == 1 and util.IsRegion(self.default_nodepool.zone):
self.region: str = self.default_nodepool.zone
self.zones = []
logging.info(
"Interpreting zone '%s' as a region", self.default_nodepool.zone
)
else:
self.region: str = util.GetRegionFromZone(self.zones[0])
def GetResourceMetadata(self) -> dict[str, Any]:
"""Returns a dict containing metadata about the cluster.
Returns:
dict mapping string property key to value.
"""
metadata = super().GetResourceMetadata()
metadata['project'] = self.project
if self.release_channel:
metadata['release_channel'] = self.release_channel
return metadata
def _GcloudCommand(self, *args, **kwargs) -> util.GcloudCommand:
"""Creates a gcloud command."""
return util.GcloudCommand(self, *args, **kwargs)
def _RunClusterCreateCommand(self, cmd: util.GcloudCommand):
"""Adds flags to the cluster create command and runs it."""
# All combinations of cluster-version and release-channel are supported.
# Specifying one uses default for the other. Specifying both can be needed
# as some versions are only supported in some release channels.
if self.cluster_version:
cmd.flags['cluster-version'] = self.cluster_version
if self.release_channel:
if FLAGS.gke_enable_alpha:
raise errors.Config.InvalidValue(
'Kubernetes Alpha is not compatible with release channels'
)
cmd.flags['release-channel'] = self.release_channel
if FLAGS.gke_enable_alpha:
cmd.args.append('--enable-kubernetes-alpha')
cmd.args.append('--no-enable-autorepair')
cmd.flags['monitoring'] = 'SYSTEM,API_SERVER,SCHEDULER,CONTROLLER_MANAGER'
user = util.GetDefaultUser()
if FLAGS.gcp_service_account:
cmd.flags['service-account'] = FLAGS.gcp_service_account
# Matches service accounts that either definitely belongs to this project or
# are a GCP managed service account like the GCE default service account,
# which we can't tell to which project they belong.
elif re.match(SERVICE_ACCOUNT_PATTERN, user):
logging.info(
'Re-using configured service-account for GKE Cluster: %s', user
)
cmd.flags['service-account'] = user
self.use_application_default_credentials = False
else:
logging.info('Using default GCE service account for GKE cluster')
cmd.flags['scopes'] = 'cloud-platform'
self._IssueResourceCreationCommand(cmd)
def _IssueResourceCreationCommand(self, cmd: util.GcloudCommand):
"""Issues a command to gcloud to create resources."""
# This command needs a long timeout due to the many minutes it
# can take to provision a large GPU-accelerated GKE cluster.
_, stderr, retcode = cmd.Issue(timeout=ONE_HOUR, raise_on_failure=False)
if retcode:
util.CheckGcloudResponseKnownFailures(stderr, retcode)
raise errors.Resource.CreationError(stderr)
def _PostCreate(self):
"""Acquires cluster authentication."""
cmd = self._GcloudCommand(
'container', 'clusters', 'get-credentials', self.name
)
env = os.environ.copy()
env['KUBECONFIG'] = FLAGS.kubeconfig
cmd.IssueRetryable(env=env)
super()._PostCreate()
def _IsDeleting(self):
cmd = self._GcloudCommand('container', 'clusters', 'describe', self.name)
stdout, _, _ = cmd.Issue(raise_on_failure=False)
return True if stdout else False
def _Delete(self):
"""Deletes the cluster."""
super()._Delete()
cmd = self._GcloudCommand('container', 'clusters', 'delete', self.name)
cmd.args.append('--async')
_, err, _ = cmd.Issue(raise_on_failure=False)
if 'Please wait and try again' in err:
raise errors.Resource.RetryableDeletionError(err)
def _Exists(self):
"""Returns True if the cluster exits."""
cmd = self._GcloudCommand('container', 'clusters', 'describe', self.name)
_, _, retcode = cmd.Issue(raise_on_failure=False)
return retcode == 0
def GetDefaultStorageClass(self) -> str:
"""Gets the default storage class for the provider."""
# https://cloud.google.com/kubernetes-engine/docs/how-to/persistent-volumes/gce-pd-csi-driver
# PD-SSD
return 'premium-rwo'
class GkeCluster(BaseGkeCluster):
"""Class representing a Google Kubernetes Engine cluster."""
CLOUD = provider_info.GCP
def __init__(self, spec: container_spec_lib.ContainerClusterSpec):
super().__init__(spec)
# Update the environment for gcloud commands:
if gcp_flags.GKE_API_OVERRIDE.value:
os.environ['CLOUDSDK_API_ENDPOINT_OVERRIDES_CONTAINER'] = (
gcp_flags.GKE_API_OVERRIDE.value
)
self.enable_nccl_fast_socket = False
if gcp_flags.GKE_NCCL_FAST_SOCKET.value:
if self.nodepools:
self.enable_nccl_fast_socket = True
else:
raise errors.Config.InvalidValue(
'NCCL fast socket is only supported on secondary node pools.'
)
self.image_type = gcp_flags.GKE_IMAGE_TYPE.value
def InitializeNodePoolForCloud(
self,
vm_config: virtual_machine.BaseVirtualMachine,
nodepool_config: container_service.BaseNodePoolConfig,
):
vm_config = typing.cast(gce_virtual_machine.GceVirtualMachine, vm_config)
nodepool_config.disk_type = vm_config.boot_disk.boot_disk_type
nodepool_config.disk_size = vm_config.boot_disk.boot_disk_size
nodepool_config.max_local_disks = vm_config.max_local_disks
nodepool_config.gpu_type = vm_config.gpu_type
nodepool_config.gpu_count = vm_config.gpu_count
nodepool_config.threads_per_core = vm_config.threads_per_core
nodepool_config.gce_tags = vm_config.gce_tags
nodepool_config.min_cpu_platform = vm_config.min_cpu_platform
nodepool_config.network = vm_config.network
nodepool_config.cpus: int = vm_config.cpus
nodepool_config.memory_mib: int = vm_config.memory_mib
def _GcloudCommand(self, *args, **kwargs) -> util.GcloudCommand:
"""Fix zone and region."""
cmd = super()._GcloudCommand(*args, **kwargs)
if len(self.zones) != 1:
del cmd.flags['zone']
cmd.flags['region'] = self.region
return cmd
def GetResourceMetadata(self) -> dict[str, Any]:
"""Returns a dict containing metadata about the cluster.
Returns:
dict mapping string property key to value.
"""
result = super().GetResourceMetadata()
result['boot_disk_type'] = self.default_nodepool.disk_type
result['boot_disk_size'] = self.default_nodepool.disk_size
if self.default_nodepool.max_local_disks:
result['gce_local_ssd_count'] = self.default_nodepool.max_local_disks
# TODO(pclay): support NVME when it leaves alpha
# Also consider moving FLAGS.gce_ssd_interface into the vm_spec.
result['gce_local_ssd_interface'] = gce_virtual_machine.SCSI
result['gke_nccl_fast_socket'] = self.enable_nccl_fast_socket
if 'nccl' in self.nodepools:
result['gpu_type'] = self.nodepools['nccl'].gpu_type
result['gpu_count'] = self.nodepools['nccl'].gpu_count
if self.image_type:
result['image_type'] = self.image_type
return result
def _Create(self):
"""Creates the cluster."""
cmd = self._GcloudCommand('container', 'clusters', 'create', self.name)
if self.default_nodepool.network:
cmd.flags['network'] = self.default_nodepool.network.network_resource.name
if gcp_flags.GKE_ENABLE_SHIELDED_NODES.value:
cmd.args.append('--enable-shielded-nodes')
else:
cmd.args.append('--no-enable-shielded-nodes')
if not self.release_channel:
cmd.args.append('--no-enable-autoupgrade')
self._AddNodeParamsToCmd(
self.default_nodepool,
cmd,
)
if (
self.min_nodes != self.default_nodepool.num_nodes
or self.max_nodes != self.default_nodepool.num_nodes
):
cmd.args.append('--enable-autoscaling')
cmd.flags['max-nodes'] = self.max_nodes
cmd.flags['min-nodes'] = self.min_nodes
cmd.flags['cluster-ipv4-cidr'] = f'/{_CalculateCidrSize(self.max_nodes)}'
cmd.flags['metadata'] = util.MakeFormattedDefaultTags()
self._RunClusterCreateCommand(cmd)
self._CreateNodePools()
def _CreateNodePools(self):
"""Creates additional nodepools for the cluster, if applicable."""
for name, nodepool in self.nodepools.items():
cmd = self._GcloudCommand(
'container', 'node-pools', 'create', name, '--cluster', self.name
)
self._AddNodeParamsToCmd(
nodepool,
cmd,
)
self._IssueResourceCreationCommand(cmd)
def _AddNodeParamsToCmd(
self,
nodepool_config: container_service.BaseNodePoolConfig,
cmd: util.GcloudCommand,
):
"""Modifies cmd to include node specific command arguments."""
# Apply labels to all nodepools.
cmd.flags['labels'] = util.MakeFormattedDefaultTags()
# Allow a long timeout due to the many minutes it can take to provision a
# large GPU-accelerated GKE cluster.
# Parameter is not documented well but is available in CLI.
cmd.flags['timeout'] = ONE_HOUR
if nodepool_config.gpu_count:
if 'a2-' not in nodepool_config.machine_type:
cmd.flags['accelerator'] = (
gce_virtual_machine.GenerateAcceleratorSpecString(
nodepool_config.gpu_type, nodepool_config.gpu_count
)
)
gce_tags = FLAGS.gce_tags
if nodepool_config.gce_tags:
gce_tags = nodepool_config.gce_tags
if gce_tags:
cmd.flags['tags'] = ','.join(gce_tags)
if nodepool_config.min_cpu_platform:
cmd.flags['min-cpu-platform'] = nodepool_config.min_cpu_platform
if nodepool_config.threads_per_core:
# TODO(user): Remove when threads-per-core is available in GA
cmd.use_alpha_gcloud = True
cmd.flags['threads-per-core'] = nodepool_config.threads_per_core
if nodepool_config.disk_size:
cmd.flags['disk-size'] = nodepool_config.disk_size
if nodepool_config.disk_type:
cmd.flags['disk-type'] = nodepool_config.disk_type
if nodepool_config.max_local_disks:
# TODO(pclay): Switch to local-ssd-volumes which support NVME when it
# leaves alpha. See
# https://cloud.google.com/sdk/gcloud/reference/alpha/container/clusters/create
cmd.flags['local-ssd-count'] = nodepool_config.max_local_disks
cmd.flags['num-nodes'] = nodepool_config.num_nodes
# zone may be split a comma separated list
if nodepool_config.zone:
cmd.flags['node-locations'] = nodepool_config.zone
if nodepool_config.machine_type is None:
cmd.flags['machine-type'] = 'custom-{}-{}'.format(
nodepool_config.cpus, nodepool_config.memory_mib
)
else:
cmd.flags['machine-type'] = nodepool_config.machine_type
if FLAGS.gke_enable_gvnic:
cmd.args.append('--enable-gvnic')
else:
cmd.args.append('--no-enable-gvnic')
if (
self.enable_nccl_fast_socket
and nodepool_config.name != container_service.DEFAULT_NODEPOOL
):
cmd.args.append('--enable-fast-socket')
if FLAGS.gke_node_system_config is not None:
cmd.flags['system-config-from-file'] = FLAGS.gke_node_system_config
if nodepool_config.sandbox_config is not None:
cmd.flags['sandbox'] = nodepool_config.sandbox_config.ToSandboxFlag()
if self.image_type:
cmd.flags['image-type'] = self.image_type
cmd.flags['node-labels'] = f'pkb_nodepool={nodepool_config.name}'
def _PostCreate(self):
"""Installs nvidia drivers if needed."""
super()._PostCreate()
# GKE does not wait for kube-dns by default
logging.info('Waiting for kube-dns')
self.WaitForResource(
'deployment/kube-dns',
condition_name='Available',
namespace='kube-system',
)
should_install_nvidia_drivers = self.default_nodepool.gpu_count or any(
pool.gpu_count for pool in self.nodepools.values()
)
if should_install_nvidia_drivers:
kubernetes_helper.CreateFromFile(NVIDIA_DRIVER_SETUP_DAEMON_SET_SCRIPT)
kubernetes_helper.CreateFromFile(
data.ResourcePath(NVIDIA_UNRESTRICTED_PERMISSIONS_DAEMON_SET)
)
def _GetInstanceGroups(self):
cmd = self._GcloudCommand('container', 'node-pools', 'list')
cmd.flags['cluster'] = self.name
stdout, _, _ = cmd.Issue()
json_output = json.loads(stdout)
instance_groups = []
for node_pool in json_output:
for group_url in node_pool['instanceGroupUrls']:
instance_groups.append(group_url.split('/')[-1]) # last url part
return instance_groups
def LabelDisks(self):
"""Sets common labels on PVCs.
GKE does this in the background every hour. Do it immediately in case the
cluster is deleted within that hour.
https://cloud.google.com/kubernetes-engine/docs/how-to/creating-managing-labels#label_propagation
"""
pvcs = self._GetPvcs()
for pvc in pvcs:
gce_disk.AddLabels(self, pvc['spec']['volumeName'])
def ResizeNodePool(
self, new_size: int, node_pool: str = container_service.DEFAULT_NODEPOOL
):
"""Changes the number of nodes in the node pool."""
cmd = self._GcloudCommand('container', 'clusters', 'resize', self.name)
cmd.flags['num-nodes'] = new_size
# updates default node pool by default
if node_pool != container_service.DEFAULT_NODEPOOL:
cmd.flags['node-pool'] = node_pool
cmd.Issue()
class GkeAutopilotCluster(BaseGkeCluster):
"""Class representing an Autopilot GKE cluster, which has no nodepools."""
CLOUD = provider_info.GCP
CLUSTER_TYPE = 'Autopilot'
def __init__(self, spec: container_spec_lib.ContainerClusterSpec):
super().__init__(spec)
# Nodepools are not supported for Autopilot clusters, but default vm_spec
# still used for pod spec input.
self.nodepools = {}
def InitializeNodePoolForCloud(
self,
vm_config: virtual_machine.BaseVirtualMachine,
nodepool_config: container_service.BaseNodePoolConfig,
):
nodepool_config.network = vm_config.network
return nodepool_config
def _GcloudCommand(self, *args, **kwargs) -> util.GcloudCommand:
"""Creates a gcloud command."""
cmd = super()._GcloudCommand(*args, **kwargs)
if 'zone' in cmd.flags:
del cmd.flags['zone']
cmd.flags['region'] = self.region
return cmd
def _Create(self):
"""Creates the cluster."""
cmd = self._GcloudCommand(
'container',
'clusters',
'create-auto',
self.name,
'--no-autoprovisioning-enable-insecure-kubelet-readonly-port',
)
if self.default_nodepool.network:
cmd.flags['network'] = self.default_nodepool.network.network_resource.name
cmd.flags['labels'] = util.MakeFormattedDefaultTags()
self._RunClusterCreateCommand(cmd)
def GetResourceMetadata(self) -> dict[str, Any]:
metadata = super().GetResourceMetadata()
metadata['zone'] = self.zone
metadata['region'] = self.region
# Override node specific metadata set in parent.
metadata['machine_type'] = self.CLUSTER_TYPE
metadata['size'] = self.CLUSTER_TYPE
metadata['nodepools'] = self.CLUSTER_TYPE
return metadata
def GetNodeSelectors(self) -> list[str]:
"""Node selectors for instance capabilites in AutoPilot clusters."""
selectors = []
# https://cloud.google.com/kubernetes-engine/docs/how-to/autopilot-gpus#request-gpus
if virtual_machine.GPU_TYPE.value:
gpu_count = virtual_machine.GPU_COUNT.value or 1
gpu_type = f'nvidia-{virtual_machine.GPU_TYPE.value}'
gpu_driver_version = gcp_flags.GKE_GPU_DRIVER_VERSION.value
selectors += [
'cloud.google.com/gke-accelerator: ' + gpu_type,
# Quote to avoid YAML parsing as int.
f"cloud.google.com/gke-accelerator-count: '{gpu_count}'",
(
'cloud.google.com/gke-gpu-driver-version:'
f" '{gpu_driver_version}'"
),
]
return selectors
def ResizeNodePool(
self, new_size: int, node_pool: str = container_service.DEFAULT_NODEPOOL
):
raise NotImplementedError('Autopilot clusters do not support resizing.')