perfkitbenchmarker/container_service.py (1,083 lines of code) (raw):
# Copyright 2017 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Contains classes related to managed container services.
For now this just consists of a base cluster class that other container
services will be derived from and a Kubernetes specific variant. This enables
users to run PKB VM based benchmarks on container providers (e.g. Kubernetes)
without pre-provisioning container clusters. In the future, this may be
expanded to support first-class container benchmarks.
"""
import calendar
import collections
import dataclasses
import datetime
import functools
import ipaddress
import itertools
import json
import logging
import multiprocessing
from multiprocessing import synchronize
import os
import re
import time
from typing import Any, Callable, Iterator, Optional, Sequence
from absl import flags
import jinja2
from perfkitbenchmarker import context
from perfkitbenchmarker import data
from perfkitbenchmarker import errors
from perfkitbenchmarker import events
from perfkitbenchmarker import kubernetes_helper
from perfkitbenchmarker import os_types
from perfkitbenchmarker import provider_info
from perfkitbenchmarker import resource
from perfkitbenchmarker import sample
from perfkitbenchmarker import units
from perfkitbenchmarker import virtual_machine
from perfkitbenchmarker import vm_util
from perfkitbenchmarker.configs import container_spec as container_spec_lib
from perfkitbenchmarker.sample import Sample
import requests
import yaml
BenchmarkSpec = Any # benchmark_spec lib imports this module.
KUBERNETES = container_spec_lib.KUBERNETES
DEFAULT_NODEPOOL = container_spec_lib.DEFAULT_NODEPOOL
FLAGS = flags.FLAGS
flags.DEFINE_string(
'kubeconfig',
None,
'Path to kubeconfig to be used by kubectl. '
"If unspecified, it will be set to a file in this run's "
'temporary directory.',
)
flags.DEFINE_string('kubectl', 'kubectl', 'Path to kubectl tool')
flags.DEFINE_boolean(
'local_container_build',
False,
'Force container images to be built locally rather than '
'just as a fallback if there is no remote image builder '
'associated with the registry.',
)
flags.DEFINE_boolean(
'static_container_image',
True,
'Whether container images are static (i.e. are not '
'managed by PKB). If this is set, PKB will accept the '
'image as fully qualified (including repository) and will '
'not attempt to build it.',
)
flags.DEFINE_integer(
'container_cluster_num_vms',
None,
'Number of nodes in the cluster. Defaults to container_cluster.vm_count',
)
flags.DEFINE_string(
'container_cluster_type', KUBERNETES, 'The type of container cluster.'
)
flags.DEFINE_string(
'container_cluster_version',
None,
'Optional version flag to pass to the cluster create '
'command. If not specified, the cloud-specific container '
'implementation will chose an appropriate default.',
)
_CONTAINER_CLUSTER_ARCHITECTURE = flags.DEFINE_list(
'container_cluster_architecture',
['linux/amd64'],
'The architecture(s) that the container cluster uses. '
'Defaults to linux/amd64',
)
_K8S_INGRESS = """
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
name: {service_name}-ingress
spec:
backend:
serviceName: {service_name}
servicePort: 8080
"""
RESOURCE_DELETE_SLEEP_SECONDS = 5
_RETRYABLE_KUBECTL_ERRORS = [
(
'"unable to decode an event from the watch stream: http2: client'
' connection lost"'
),
'read: connection reset by peer',
'Unable to connect to the server: dial tcp',
'Unable to connect to the server: net/http: TLS handshake timeout',
]
class ContainerException(errors.Error):
"""Exception during the creation or execution of a container."""
class FatalContainerException(
errors.Resource.CreationError, ContainerException
):
"""Fatal Exception during the creation or execution of a container."""
pass
class RetriableContainerException(
errors.Resource.RetryableCreationError, ContainerException
):
"""Retriable Exception during the creation or execution of a container."""
pass
def RunKubectlCommand(command: list[str], **kwargs) -> tuple[str, str, int]:
"""Run a kubectl command."""
if 'stack_level' in kwargs:
kwargs['stack_level'] += 1
else:
# IssueCommand defaults stack_level to 1, so 2 skips this function.
kwargs['stack_level'] = 2
cmd = [FLAGS.kubectl, '--kubeconfig', FLAGS.kubeconfig] + command
orig_suppress_failure = None
if 'suppress_failure' in kwargs:
orig_suppress_failure = kwargs['suppress_failure']
def _DetectTimeoutViaSuppressFailure(stdout, stderr, retcode):
# Check for kubectl timeout. If found, treat it the same as a regular
# timeout.
if retcode != 0:
for error_substring in _RETRYABLE_KUBECTL_ERRORS:
if error_substring in stderr:
# Raise timeout error regardless of raise_on_failure - as the intended
# semantics is to ignore expected errors caused by invoking the
# command not errors from PKB infrastructure.
raise_on_timeout = (
kwargs['raise_on_timeout']
if 'raise_on_timeout' in kwargs
else True
)
if raise_on_timeout:
raise errors.VmUtil.IssueCommandTimeoutError(stderr)
# Else, if the user supplied a suppress_failure function, try that.
if orig_suppress_failure is not None:
return orig_suppress_failure(stdout, stderr, retcode)
# Else, no suppression.
return False
kwargs['suppress_failure'] = _DetectTimeoutViaSuppressFailure
return vm_util.IssueCommand(cmd, **kwargs)
def RunRetryableKubectlCommand(
run_cmd: list[str], timeout: int | None = None, **kwargs
) -> tuple[str, str, int]:
"""Runs a kubectl command, retrying somewhat exepected errors."""
if 'raise_on_timeout' in kwargs and kwargs['raise_on_timeout']:
raise ValueError(
'RunRetryableKubectlCommand does not allow `raise_on_timeout=True`'
' (since timeouts are retryable).'
)
if 'stack_level' in kwargs:
kwargs['stack_level'] += 1
else:
# IssueCommand defaults stack_level to 1, so 2 skips this function.
kwargs['stack_level'] = 2
@vm_util.Retry(
timeout=timeout,
retryable_exceptions=(errors.VmUtil.IssueCommandTimeoutError,),
)
def _RunRetryablePart(run_cmd: list[str], **kwargs):
"""Inner function retries command so timeout can be passed to decorator."""
kwargs['stack_level'] += 1
return RunKubectlCommand(run_cmd, raise_on_timeout=True, **kwargs)
return _RunRetryablePart(run_cmd, timeout=timeout, **kwargs)
class BaseContainer(resource.BaseResource):
"""Class representing a single container."""
def __init__(
self, container_spec: container_spec_lib.ContainerSpec | None = None
):
# Hack to make container_spec a kwarg
assert container_spec
super().__init__()
self.cpus: float = container_spec.cpus
self.memory: int = container_spec.memory
self.command: list[str] = container_spec.command
self.image: str = container_spec.image
self.ip_address: str | None = None
def WaitForExit(self, timeout: int = 1200) -> dict[str, Any]:
"""Gets the successfully finished container.
Args:
timeout: The timeout to wait in seconds
Raises:
FatalContainerException: If the container fails
RetriableContainerException: If the container times out wihout succeeding.
"""
raise NotImplementedError()
def GetLogs(self):
"""Returns the logs from the container."""
raise NotImplementedError()
class BaseContainerService(resource.BaseResource):
"""Class representing a service backed by containers."""
def __init__(self, container_spec: container_spec_lib.ContainerSpec):
super().__init__()
self.cpus: float = container_spec.cpus
self.memory: int = container_spec.memory
self.command: list[str] = container_spec.command
self.image: str = container_spec.image
self.container_port: int = container_spec.container_port
self.ip_address: str | None = None
self.port: int | None = None
self.host_header: str | None = None
class ContainerImage:
"""Simple class for tracking container image names and source locations."""
def __init__(self, name: str):
self.name: str = name
self.directory: str = os.path.dirname(
data.ResourcePath(os.path.join('docker', self.name, 'Dockerfile'))
)
class BaseContainerRegistry(resource.BaseResource):
"""Base class for container image registries."""
RESOURCE_TYPE = 'BaseContainerRegistry'
CLOUD: str
def __init__(self, registry_spec: container_spec_lib.ContainerRegistrySpec):
super().__init__()
benchmark_spec: BenchmarkSpec = context.GetThreadBenchmarkSpec()
container_cluster = getattr(benchmark_spec, 'container_cluster', None)
zone = getattr(container_cluster, 'zone', None)
project = getattr(container_cluster, 'project', None)
self.zone: str = registry_spec.zone or zone
self.project: str = registry_spec.project or project
self.name: str = registry_spec.name or 'pkb%s' % FLAGS.run_uri
self.local_build_times: dict[str, float] = {}
self.remote_build_times: dict[str, float] = {}
self.metadata.update({'cloud': self.CLOUD})
def _Create(self):
"""Creates the image registry."""
pass
def _Delete(self):
"""Deletes the image registry."""
pass
def GetSamples(self):
"""Returns image build related samples."""
samples = []
metadata = self.GetResourceMetadata()
for image_name, build_time in self.local_build_times.items():
metadata.update({
'build_type': 'local',
'image': image_name,
})
samples.append(
sample.Sample('Image Build Time', build_time, 'seconds', metadata)
)
for image_name, build_time in self.remote_build_times.items():
metadata.update({
'build_type': 'remote',
'image': image_name,
})
samples.append(
sample.Sample('Image Build Time', build_time, 'seconds', metadata)
)
return samples
def GetFullRegistryTag(self, image: str):
"""Returns the full name of the image for the registry.
Args:
image: The PKB name of the image (string).
"""
raise NotImplementedError()
def PrePush(self, image: ContainerImage):
"""Prepares registry to push a given image."""
pass
def RemoteBuild(self, image: ContainerImage):
"""Build the image remotely.
Args:
image: Instance of _ContainerImage representing the image to build.
"""
raise NotImplementedError()
def Login(self):
"""Log in to the registry (in order to push to it)."""
raise NotImplementedError()
def LocalBuildAndPush(self, image: ContainerImage):
"""Build the image locally and push to registry.
Assumes we are already authenticated with the registry from self.Login.
Building and pushing done in one command to support multiarch images
https://github.com/docker/buildx/issues/59
Args:
image: The image to build.
"""
full_tag = self.GetFullRegistryTag(image.name)
# Multiarch images require buildx create
# https://github.com/docker/build-push-action/issues/302
vm_util.IssueCommand(['docker', 'buildx', 'create', '--use'])
cmd = ['docker', 'buildx', 'build']
if _CONTAINER_CLUSTER_ARCHITECTURE.value:
cmd += ['--platform', ','.join(_CONTAINER_CLUSTER_ARCHITECTURE.value)]
cmd += ['--no-cache', '--push', '-t', full_tag, image.directory]
vm_util.IssueCommand(cmd, timeout=None)
vm_util.IssueCommand(['docker', 'buildx', 'stop'])
def GetOrBuild(self, image: str):
"""Finds the image in the registry or builds it.
TODO(pclay): Add support for build ARGs.
Args:
image: The PKB name for the image (string).
Returns:
The full image name (including the registry).
"""
full_image = self.GetFullRegistryTag(image)
self.Login()
self._Build(image)
return full_image
def _Build(self, image: str):
"""Builds the image and pushes it to the registry if necessary.
Args:
image: The PKB name for the image (string).
"""
image = ContainerImage(image)
if not FLAGS.local_container_build:
try:
build_start = time.time()
# Build the image remotely using an image building service.
self.RemoteBuild(image)
self.remote_build_times[image.name] = time.time() - build_start
return
except NotImplementedError:
pass
# TODO(pclay): Refactor ECR and remove.
self.PrePush(image)
# Build the image locally using docker.
build_start = time.time()
self.LocalBuildAndPush(image)
self.local_build_times[image.name] = time.time() - build_start
def GetContainerRegistryClass(cloud: str) -> type[BaseContainerRegistry]:
return resource.GetResourceClass(BaseContainerRegistry, CLOUD=cloud)
@events.benchmark_start.connect
def _SetKubeConfig(unused_sender, benchmark_spec: BenchmarkSpec):
"""Sets the value for the kubeconfig flag if it's unspecified."""
if not FLAGS.kubeconfig:
FLAGS.kubeconfig = vm_util.PrependTempDir(
'kubeconfig' + str(benchmark_spec.sequence_number)
)
# Store the value for subsequent run stages.
benchmark_spec.config.flags['kubeconfig'] = FLAGS.kubeconfig
def NodePoolName(name: str) -> str:
"""Clean node pool names to be usable by all providers."""
# GKE (or k8s?) requires nodepools use alphanumerics and hyphens
# AKS requires full alphanumeric
# PKB likes to use underscores strip them out.
return name.replace('_', '')
class BaseNodePoolConfig:
"""A node pool's config, where each node in the node pool has the same config.
See also: https://cloud.google.com/kubernetes-engine/docs/concepts/node-pools
"""
def __init__(self, vm_config: virtual_machine.BaseVirtualMachine, name: str):
# Use Virtual Machine class to resolve VM Spec. Note there is no actual VM;
# we just use it as a data holder to let VM subclass __init__'s handle
# parsing specific information like disks out of the spec.
self.machine_type = vm_config.machine_type
self.name = NodePoolName(name)
self.zone: str = vm_config.zone
self.sandbox_config: container_spec_lib.SandboxSpec | None = None
self.num_nodes: int
self.disk_type: str
self.disk_size: int
# Defined by google_kubernetes_engine
self.max_local_disks: int | None
self.gpu_type: str | None
self.gpu_count: int | None
self.threads_per_core: int
self.gce_tags: list[str]
self.min_cpu_platform: str
self.cpus: int
self.memory_mib: int
class BaseContainerCluster(resource.BaseResource):
"""A cluster that can be used to schedule containers."""
RESOURCE_TYPE = 'BaseContainerCluster'
REQUIRED_ATTRS = ['CLOUD', 'CLUSTER_TYPE']
CLOUD: str
CLUSTER_TYPE: str
def __init__(self, cluster_spec: container_spec_lib.ContainerClusterSpec):
super().__init__(user_managed=bool(cluster_spec.static_cluster))
self.name: str = cluster_spec.static_cluster or 'pkb-' + FLAGS.run_uri
# Go get a BaseVM, to use strictly for config values.
default_vm_class = virtual_machine.GetVmClass(
self.CLOUD, os_types.DEFAULT, provider_info.DEFAULT_VM_PLATFORM
)
default_vm_config: virtual_machine.BaseVirtualMachine = default_vm_class(
cluster_spec.vm_spec
) # pytype: disable=not-instantiable
self.default_nodepool = self._InitializeDefaultNodePool(
cluster_spec, default_vm_config
)
self.nodepools: dict[str, BaseNodePoolConfig] = {}
for name, nodepool_spec in cluster_spec.nodepools.copy().items():
vm_config: virtual_machine.BaseVirtualMachine = default_vm_class(
nodepool_spec.vm_spec
) # pytype: disable=not-instantiable
nodepool = self._InitializeNodePool(name, nodepool_spec, vm_config)
self.nodepools[nodepool.name] = nodepool
self.min_nodes: int = (
cluster_spec.min_vm_count or self.default_nodepool.num_nodes
)
self.max_nodes: int = (
cluster_spec.max_vm_count or self.default_nodepool.num_nodes
)
self.containers: dict[str, list[KubernetesContainer]] = (
collections.defaultdict(list)
)
self.services: dict[str, KubernetesContainerService] = {}
@property
def num_nodes(self) -> int:
return self.default_nodepool.num_nodes
@property
def zone(self) -> str:
return self.default_nodepool.zone
def _InitializeDefaultNodePool(
self,
cluster_spec: container_spec_lib.ContainerClusterSpec,
vm_config: virtual_machine.BaseVirtualMachine,
) -> BaseNodePoolConfig:
nodepool_config = BaseNodePoolConfig(
vm_config,
DEFAULT_NODEPOOL,
)
nodepool_config.num_nodes = cluster_spec.vm_count
self.InitializeNodePoolForCloud(vm_config, nodepool_config)
return nodepool_config
def _InitializeNodePool(
self,
name: str,
nodepool_spec: container_spec_lib.NodepoolSpec,
vm_config: virtual_machine.BaseVirtualMachine,
) -> BaseNodePoolConfig:
zone = (
nodepool_spec.vm_spec.zone
if nodepool_spec.vm_spec
else self.default_nodepool.zone
)
nodepool_config = BaseNodePoolConfig(
vm_config,
name,
)
nodepool_config.sandbox_config = nodepool_spec.sandbox_config
nodepool_config.zone = zone
nodepool_config.num_nodes = nodepool_spec.vm_count
self.InitializeNodePoolForCloud(vm_config, nodepool_config)
return nodepool_config
def InitializeNodePoolForCloud(
self,
vm_config: virtual_machine.BaseVirtualMachine,
nodepool_config: BaseNodePoolConfig,
):
"""Override to initialize cloud specific configs."""
pass
def DeleteContainers(self):
"""Delete containers belonging to the cluster."""
for container in itertools.chain(*list(self.containers.values())):
container.Delete()
def DeleteServices(self):
"""Delete services belonging to the cluster."""
for service in self.services.values():
service.Delete()
def GetResourceMetadata(self):
"""Returns a dictionary of cluster metadata."""
nodepools_metadata = {}
for name, nodepool in self.nodepools.items():
nodepool_metadata = {
'size': nodepool.num_nodes,
'machine_type': nodepool.machine_type,
'name': name,
}
if nodepool.sandbox_config is not None:
nodepool_metadata['sandbox_config'] = {
'type': nodepool.sandbox_config.type,
}
nodepools_metadata[name] = nodepool_metadata
metadata = {
'cloud': self.CLOUD,
'cluster_type': self.CLUSTER_TYPE,
'zone': self.default_nodepool.zone,
'size': self.default_nodepool.num_nodes,
'machine_type': self.default_nodepool.machine_type,
'nodepools': nodepools_metadata,
}
if (
self.min_nodes != self.default_nodepool.num_nodes
or self.max_nodes != self.default_nodepool.num_nodes
):
metadata.update({
'max_size': self.max_nodes,
'min_size': self.min_nodes,
})
return metadata
def DeployContainer(self, name, container_spec):
"""Deploys Containers according to the ContainerSpec."""
raise NotImplementedError()
def DeployContainerService(self, name, container_spec):
"""Deploys a ContainerSerivice according to the ContainerSpec."""
raise NotImplementedError()
def GetSamples(self):
"""Return samples with information about deployment times."""
samples = super().GetSamples()
for container in itertools.chain(*list(self.containers.values())):
metadata = {'image': container.image.split('/')[-1]}
if container.resource_ready_time and container.create_start_time:
samples.append(
sample.Sample(
'Container Deployment Time',
container.resource_ready_time - container.create_start_time,
'seconds',
metadata,
)
)
if container.delete_end_time and container.delete_start_time:
samples.append(
sample.Sample(
'Container Delete Time',
container.delete_end_time - container.delete_start_time,
'seconds',
metadata,
)
)
for service in self.services.values():
metadata = {'image': service.image.split('/')[-1]}
if service.resource_ready_time and service.create_start_time:
samples.append(
sample.Sample(
'Service Deployment Time',
service.resource_ready_time - service.create_start_time,
'seconds',
metadata,
)
)
if service.delete_end_time and service.delete_start_time:
samples.append(
sample.Sample(
'Service Delete Time',
service.delete_end_time - service.delete_start_time,
'seconds',
metadata,
)
)
return samples
def ResizeNodePool(self, new_size: int, node_pool: str = DEFAULT_NODEPOOL):
"""Change the number of nodes in the node pool."""
raise NotImplementedError
def GetContainerClusterClass(
cloud: str, cluster_type: str
) -> type[BaseContainerCluster]:
return resource.GetResourceClass(
BaseContainerCluster, CLOUD=cloud, CLUSTER_TYPE=cluster_type
)
class KubernetesPod:
"""Representation of a Kubernetes pod.
It can be created as a PKB managed resource using KubernetesContainer,
or created with ApplyManifest and directly constructed.
"""
def __init__(self, name=None, **kwargs):
super().__init__(**kwargs)
assert name
self.name = name
def _GetPod(self) -> dict[str, Any]:
"""Gets a representation of the POD and returns it."""
stdout, _, _ = RunKubectlCommand(['get', 'pod', self.name, '-o', 'yaml'])
pod = yaml.safe_load(stdout)
self.ip_address = pod.get('status', {}).get('podIP')
return pod
def _ValidatePodHasNotFailed(self, status: dict[str, Any]):
"""Raises an exception if the pod has failed."""
# Inspect the pod's status to determine if it succeeded, has failed, or is
# doomed to fail.
# https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/
phase = status['phase']
if phase == 'Succeeded':
return
elif phase == 'Failed':
raise FatalContainerException(
f'Pod {self.name} failed:\n{yaml.dump(status)}'
)
else:
for condition in status.get('conditions', []):
if (
condition['type'] == 'PodScheduled'
and condition['status'] == 'False'
and condition['reason'] == 'Unschedulable'
):
# TODO(pclay): Revisit this when we scale clusters.
raise FatalContainerException(
f"Pod {self.name} failed to schedule:\n{condition['message']}"
)
for container_status in status.get('containerStatuses', []):
waiting_status = container_status['state'].get('waiting', {})
if waiting_status.get('reason') in [
'ErrImagePull',
'ImagePullBackOff',
]:
raise FatalContainerException(
f'Failed to find container image for {self.name}:\n'
+ yaml.dump(waiting_status.get('message'))
)
def WaitForExit(self, timeout: int | None = None) -> dict[str, Any]:
"""Gets the finished running container."""
@vm_util.Retry(
timeout=timeout, retryable_exceptions=(RetriableContainerException,)
)
def _WaitForExit():
pod = self._GetPod()
status = pod['status']
self._ValidatePodHasNotFailed(status)
phase = status['phase']
if phase == 'Succeeded':
return pod
else:
raise RetriableContainerException(
f'Pod phase ({phase}) not in finished phases.'
)
return _WaitForExit()
def GetLogs(self):
"""Returns the logs from the container."""
stdout, _, _ = RunKubectlCommand(['logs', self.name])
return stdout
# Order KubernetesPod first so that it's constructor is called first.
class KubernetesContainer(KubernetesPod, BaseContainer):
"""A KubernetesPod based flavor of Container."""
def _Create(self):
"""Creates the container."""
run_cmd = [
'run',
self.name,
'--image=%s' % self.image,
'--restart=Never',
# Allow scheduling on ARM nodes.
'--overrides',
json.dumps({
'spec': {
'tolerations': [{
'operator': 'Exists',
'key': 'kubernetes.io/arch',
'effect': 'NoSchedule',
}]
}
}),
]
limits = []
if self.cpus:
limits.append(f'cpu={int(1000 * self.cpus)}m')
if self.memory:
limits.append(f'memory={self.memory}Mi')
if limits:
run_cmd.append('--limits=' + ','.join(limits))
if self.command:
run_cmd.extend(['--command', '--'])
run_cmd.extend(self.command)
RunKubectlCommand(run_cmd)
def _Delete(self):
"""Deletes the container."""
pass
def _IsReady(self):
"""Returns true if the container has stopped pending."""
status = self._GetPod()['status']
super()._ValidatePodHasNotFailed(status)
return status['phase'] != 'Pending'
class KubernetesContainerService(BaseContainerService):
"""A Kubernetes flavor of Container Service."""
def __init__(self, container_spec, name):
super().__init__(container_spec)
self.name = name
self.port = 8080
def _Create(self):
run_cmd = [
'run',
self.name,
'--image=%s' % self.image,
'--port',
str(self.port),
]
limits = []
if self.cpus:
limits.append(f'cpu={int(1000 * self.cpus)}m')
if self.memory:
limits.append(f'memory={self.memory}Mi')
if limits:
run_cmd.append('--limits=' + ','.join(limits))
if self.command:
run_cmd.extend(['--command', '--'])
run_cmd.extend(self.command)
RunKubectlCommand(run_cmd)
expose_cmd = [
'expose',
'deployment',
self.name,
'--type',
'NodePort',
'--target-port',
str(self.port),
]
RunKubectlCommand(expose_cmd)
with vm_util.NamedTemporaryFile() as tf:
tf.write(_K8S_INGRESS.format(service_name=self.name))
tf.close()
kubernetes_helper.CreateFromFile(tf.name)
def _GetIpAddress(self):
"""Attempts to set the Service's ip address."""
ingress_name = '%s-ingress' % self.name
get_cmd = [
'get',
'ing',
ingress_name,
'-o',
'jsonpath={.status.loadBalancer.ingress[*].ip}',
]
stdout, _, _ = RunKubectlCommand(get_cmd)
ip_address = stdout
if ip_address:
self.ip_address = ip_address
def _IsReady(self):
"""Returns True if the Service is ready."""
if self.ip_address is None:
self._GetIpAddress()
if self.ip_address is not None:
url = 'http://%s' % (self.ip_address)
r = requests.get(url)
if r.status_code == 200:
return True
return False
def _Delete(self):
"""Deletes the service."""
with vm_util.NamedTemporaryFile() as tf:
tf.write(_K8S_INGRESS.format(service_name=self.name))
tf.close()
kubernetes_helper.DeleteFromFile(tf.name)
delete_cmd = ['delete', 'deployment', self.name]
RunKubectlCommand(delete_cmd, raise_on_failure=False)
class KubernetesClusterCommands:
"""Implementation of many Kubernetes commands.
All methods just call generic kubectl commands without needing instance
information.
"""
@staticmethod
def _DeleteAllFromDefaultNamespace():
"""Deletes all resources from a namespace.
Since StatefulSets do not reclaim PVCs upon deletion, they are explicitly
deleted here to prevent dynamically provisioned PDs from leaking once the
cluster has been deleted.
"""
try:
# Delete deployments first as otherwise autorepair will redeploy deleted
# pods.
run_cmd = ['delete', 'deployment', '--all', '-n', 'default']
RunRetryableKubectlCommand(run_cmd)
timeout = 60 * 20
run_cmd = [
'delete',
'all',
'--all',
'-n',
'default',
f'--timeout={timeout}s',
]
RunRetryableKubectlCommand(run_cmd, timeout=timeout)
run_cmd = ['delete', 'pvc', '--all', '-n', 'default']
RunKubectlCommand(run_cmd)
# There maybe a slight race if resources are cleaned up in the background
# where deleting the cluster immediately prevents the PVCs from being
# deleted.
logging.info(
'Sleeping for %s seconds to give resources time to delete.',
RESOURCE_DELETE_SLEEP_SECONDS,
)
time.sleep(RESOURCE_DELETE_SLEEP_SECONDS)
except (
errors.VmUtil.IssueCommandTimeoutError,
vm_util.TimeoutExceededRetryError,
) as e:
raise errors.Resource.RetryableDeletionError(
'Timed out while deleting all resources from default namespace. We'
' should still continue trying to delete everything.'
) from e
@staticmethod
def ApplyManifest(
manifest_file: str, should_log_file: bool = True, **kwargs
) -> Iterator[str]:
"""Applies a declarative Kubernetes manifest; possibly with jinja.
Args:
manifest_file: The name of the YAML file or YAML template.
should_log_file: Whether to log the rendered manifest to stdout or not.
**kwargs: Arguments to the jinja template.
Returns:
Names of the resources, e.g. [deployment.apps/mydeploy, pod/foo]
"""
def _ParseApplyOutput(stdout: str) -> Iterator[str]:
"""Parses the output of kubectl apply to get the name of the resource."""
# Example input: deployment.apps/pkb123 created
for line in stdout.splitlines():
match = re.search(r'([^\s/]+/[^\s/]+) created', line)
if match:
yield match.group(1)
filename = data.ResourcePath(manifest_file)
if not filename.endswith('.j2'):
assert not kwargs
out, _, _ = RunKubectlCommand(['apply', '-f', filename])
return _ParseApplyOutput(out)
environment = jinja2.Environment(undefined=jinja2.StrictUndefined)
with open(filename) as template_file, vm_util.NamedTemporaryFile(
mode='w', suffix='.yaml'
) as rendered_template:
manifest = environment.from_string(template_file.read()).render(kwargs)
rendered_template.write(manifest)
rendered_template.close()
if should_log_file:
logging.info(
'Rendered manifest file %s with contents:\n%s',
rendered_template.name,
manifest,
)
out, _, _ = RunKubectlCommand(['apply', '-f', rendered_template.name])
return _ParseApplyOutput(out)
@staticmethod
def WaitForResource(
resource_name: str,
condition_name: str,
namespace: str | None = None,
timeout: int = vm_util.DEFAULT_TIMEOUT,
wait_for_all: bool = False,
condition_type='condition=',
):
"""Waits for a condition on a Kubernetes resource (eg: deployment, pod)."""
run_cmd = [
'wait',
f'--for={condition_type}{condition_name}',
f'--timeout={timeout}s',
resource_name,
]
if namespace:
run_cmd.append(f'--namespace={namespace}')
if wait_for_all:
run_cmd.append('--all')
RunKubectlCommand(run_cmd, timeout=timeout + 10)
@staticmethod
def WaitForSucceeded(
resource_name: str,
namespace: str | None = None,
timeout: int = vm_util.DEFAULT_TIMEOUT,
raise_on_failure: bool = True,
) -> tuple[str, str, int]:
"""Waits for a resource to complete (i.e. .status.phase=='Succeeded')."""
run_cmd = [
'wait',
'--for=jsonpath={.status.phase}=Succeeded',
f'--timeout={timeout}s',
resource_name,
]
if namespace:
run_cmd.append(f'--namespace={namespace}')
return RunKubectlCommand(
run_cmd, timeout=timeout + 10, raise_on_failure=raise_on_failure
)
@staticmethod
def WaitForRollout(
resource_name: str, timeout: int = vm_util.DEFAULT_TIMEOUT
):
"""Blocks until a Kubernetes rollout is completed."""
run_cmd = [
'rollout',
'status',
'--timeout=%ds' % timeout,
resource_name,
]
RunRetryableKubectlCommand(
run_cmd,
timeout=timeout,
)
@staticmethod
@vm_util.Retry(retryable_exceptions=(errors.Resource.RetryableCreationError,))
def GetLoadBalancerIP(service_name: str):
"""Returns the IP address of a LoadBalancer service when ready."""
get_cmd = [
'get',
'service',
service_name,
'-o',
'jsonpath={.status.loadBalancer.ingress[0].ip}',
]
stdout, _, _ = RunKubectlCommand(get_cmd)
try:
# Ensure the load balancer is ready by parsing the output IP
ip_address = ipaddress.ip_address(stdout)
except ValueError:
raise errors.Resource.RetryableCreationError(
"Load Balancer IP for service '%s' is not ready." % service_name
)
return format(ip_address)
@staticmethod
@vm_util.Retry(retryable_exceptions=(errors.Resource.RetryableCreationError,))
def GetClusterIP(service_name: str) -> str:
"""Returns the IP address of a ClusterIP service when ready."""
get_cmd = [
'get',
'service',
service_name,
'-o',
'jsonpath={.spec.clusterIP}',
]
stdout, _, _ = RunKubectlCommand(get_cmd)
if not stdout:
raise errors.Resource.RetryableCreationError(
"ClusterIP for service '%s' is not ready." % service_name
)
return stdout
@staticmethod
def GetNumReplicasSamples(
resource_name: str, namespace: Optional[str] = None
) -> list[Sample]:
"""Returns a count of the replias (and state) for the specified resource.
The number of ready and unready replicas should always sum to the total
replicas.
Args:
resource_name: The deployment/statefulset/etc's name, e.g.
'deployment/my_deployment'.
namespace: The namespace of the resource. If omitted, the 'default'
namespace will be used.
Returns:
A list of the (total replicas, ready replicas, unready replicas) for this
resource (as `Sample`s), or an empty list if the resource cannot be found.
"""
now = int(time.time())
if namespace is None:
namespace = 'default'
stdout, stderr, retcode = RunKubectlCommand(
[
'get',
resource_name,
'-n',
namespace,
"-o=jsonpath='{.status.replicas}, {.status.readyReplicas}'",
],
raise_on_failure=False,
)
if retcode != 0:
if re.match('^Error from server \\(NotFound\\):.*', stderr) is not None:
# The specified resource wasn't found
return []
else:
# Some other error.
raise errors.VmUtil.IssueCommandError(
f'Unable to query list of replicas: {stderr}'
)
stdout = stdout.strip("' ")
replicas = int(stdout.split(',')[0])
ready_replicas = int(stdout.split(',')[1])
unready_replicas = replicas - ready_replicas
def _Sample(count: int, state: str) -> Sample:
return Sample(
metric='k8s/num_replicas_' + state,
value=count,
unit='',
metadata={
'namespace': namespace,
'resource_name': resource_name,
},
timestamp=now,
)
return [
_Sample(replicas, 'any'),
_Sample(ready_replicas, 'ready'),
_Sample(unready_replicas, 'unready'),
]
@staticmethod
def GetNumNodesSamples() -> list[Sample]:
"""Returns a count of nodes in each state for the cluster.
The number of ready, unready, and unknown nodes should always sum to the
total nodes.
Returns:
A List of the (total nodes, ready nodes, unready nodes, unknown nodes)
for this cluster as `Sample`s.
"""
now = int(time.time())
jsonpath = (
'{range .items[*]}'
'{@.status.conditions[?(@.type=="Ready")].status}{"\\n"}'
'{end}'
)
stdout, _, _ = RunKubectlCommand(
['get', 'nodes', f"-o=jsonpath='{jsonpath}'"]
)
total = ready = unready = unknown = 0
for line in stdout.splitlines():
status = line.strip("' ").lower()
if not status:
continue
elif status == 'true':
ready += 1
elif status == 'false':
unready += 1
else:
# status should be strictly 'unknown', but we'll also categorize any
# other unexpected response as 'unknown'
unknown += 1
total += 1
def _Sample(count: int, state: str) -> Sample:
# TOCONSIDER: maybe include the nodepool name in the metadata?
return Sample(
metric='k8s/num_nodes_' + state,
value=count,
unit='',
metadata={},
timestamp=now,
)
return [
_Sample(total, 'any'),
_Sample(ready, 'ready'),
_Sample(unready, 'unready'),
_Sample(unknown, 'unknown'),
]
@staticmethod
def CreateConfigMap(name: str, from_file_dir: str):
"""Creates a Kubernetes ConfigMap.
Args:
name: The name of the ConfigMap to create
from_file_dir: The directory name containing files that will be key/values
in the ConfigMap
"""
RunKubectlCommand(
['create', 'configmap', name, '--from-file', from_file_dir]
)
@staticmethod
def CreateServiceAccount(
name: str, clusterrole: str | None = None, namespace='default'
):
"""Create a k8s service account and cluster-role-binding."""
RunKubectlCommand(
['create', 'serviceaccount', name, '--namespace', namespace]
)
if clusterrole:
# TODO(pclay): Support customer cluster roles?
RunKubectlCommand([
'create',
'clusterrolebinding',
f'{name}-role',
f'--clusterrole={clusterrole}',
f'--serviceaccount={namespace}:{name}',
'--namespace',
namespace,
])
@property
@functools.lru_cache(maxsize=1)
def k8s_version(self) -> str:
"""Actual Kubernetes version reported by server."""
stdout, _, _ = RunKubectlCommand(['version', '-o', 'yaml'])
return yaml.safe_load(stdout)['serverVersion']['gitVersion']
@staticmethod
def GetPodLabel(resource_name):
run_cmd = [
'get',
resource_name,
'-o',
'jsonpath="{.spec.selector.matchLabels.app}"',
]
stdout, _, _ = RunKubectlCommand(run_cmd)
return yaml.safe_load(stdout)
@staticmethod
def GetPodIpsByLabel(pod_label_key, pod_label_value) -> list[str]:
"""Returns a list of internal IPs for pod label key-value.
Args:
pod_label_key: The pod label name
pod_label_value: The pod label value
"""
get_cmd = [
'get',
'pods',
'-l',
f'{pod_label_key}={pod_label_value}',
'-o',
'jsonpath="{.items[*].status.podIP}"',
]
stdout, _, _ = RunKubectlCommand(get_cmd)
return yaml.safe_load(stdout).split()
@staticmethod
def GetPodIps(resource_name) -> list[str]:
"""Returns a list of internal IPs for a pod name.
Args:
resource_name: The pod resource name
"""
pod_label = KubernetesClusterCommands.GetPodLabel(resource_name)
return KubernetesClusterCommands.GetPodIpsByLabel('app', pod_label)
@staticmethod
def GetPodNames() -> list[str]:
"""Returns all pod names in the cluster."""
return KubernetesClusterCommands.GetAllNamesForResourceType('pods')
@staticmethod
def GetNodeNames() -> list[str]:
"""Get the node names for the cluster."""
return KubernetesClusterCommands.GetAllNamesForResourceType('nodes')
@staticmethod
def GetAllNamesForResourceType(resource_type: str) -> list[str]:
"""Get all names for the specified resource. Type should be plural."""
stdout, _, _ = RunKubectlCommand(
['get', resource_type, '-o', 'jsonpath={.items[*].metadata.name}']
)
return stdout.split()
@staticmethod
def RunKubectlExec(pod_name, cmd):
run_cmd = ['exec', '-it', pod_name, '--'] + cmd
RunKubectlCommand(run_cmd)
@staticmethod
def _GetPvcs() -> Sequence[Any]:
stdout, _, _ = RunKubectlCommand(['get', 'pvc', '-o', 'yaml'])
return yaml.safe_load(stdout)['items']
@staticmethod
def _GetEvents(**kwargs) -> set['KubernetesEvent']:
"""Get the events for the cluster."""
stdout, _, _ = RunKubectlCommand(['get', 'events', '-o', 'yaml'], **kwargs)
k8s_events = set()
for item in yaml.safe_load(stdout)['items']:
k8s_event = KubernetesEvent.FromDict(item)
if k8s_event:
k8s_events.add(k8s_event)
return k8s_events
class KubernetesEventPoller:
"""Wrapper which polls for Kubernetes events."""
def __init__(self, get_events_fn: Callable[[], set['KubernetesEvent']]):
self.get_events_fn = get_events_fn
self.polled_events: set['KubernetesEvent'] = set()
self.stop_polling = multiprocessing.Event()
self.event_queue: multiprocessing.Queue = multiprocessing.Queue()
self.event_poller = multiprocessing.Process(
target=self._PollForEvents,
args=((
self.event_queue,
self.stop_polling,
)),
)
self.event_poller.daemon = True
def _PollForEvents(
self,
queue: multiprocessing.Queue,
stop_polling: synchronize.Event,
):
"""Polls for events & (ideally asynchronously) waits to poll again.
Results are appended to the queue. Timeouts are ignored.
Args:
queue: The queue to append events to.
stop_polling: Stop polling when set.
"""
while True:
try:
k8s_events = self.get_events_fn()
logging.info(
'From async get events process, got %s events', len(k8s_events)
)
for event in k8s_events:
queue.put(event)
except errors.VmUtil.IssueCommandTimeoutError:
logging.info(
'Async get events command timed out. This may result in missing'
' events, but is not a reason to fail the benchmark.'
)
pass
start_sleep_time = time.time()
while time.time() - start_sleep_time < 60 * 40:
time.sleep(1)
if stop_polling.is_set():
return
def StartPolling(self):
"""Starts polling for events."""
self.event_poller.start()
# Stop polling events even if the resource is not deleted.
def _StopPollingConnected(unused1, **kwargs):
del unused1, kwargs
self.StopPolling()
events.benchmark_end.connect(_StopPollingConnected, weak=False)
def StopPolling(self):
"""Stops polling for events, joining the poller process."""
logging.info('Stopping event poller')
self.stop_polling.set()
while not self.event_queue.empty():
self.polled_events.add(self.event_queue.get())
self.event_poller.join(timeout=30)
if self.event_poller.is_alive():
logging.warning(
'Event poller process did not join in 30 seconds; killing it.'
)
self.event_poller.kill()
self.event_poller.join(timeout=30)
def GetEvents(self) -> set['KubernetesEvent']:
"""Gets the events for the cluster, including previously polled events."""
k8s_events = self.get_events_fn()
self.polled_events.update(k8s_events)
while not self.event_queue.empty():
self.polled_events.add(self.event_queue.get())
return self.polled_events
class KubernetesCluster(BaseContainerCluster, KubernetesClusterCommands):
"""A Kubernetes flavor of Container Cluster."""
CLUSTER_TYPE = KUBERNETES
def __init__(self, cluster_spec: container_spec_lib.ContainerClusterSpec):
super().__init__(cluster_spec)
self.event_poller: KubernetesEventPoller | None = None
if cluster_spec.poll_for_events:
def _GetEventsNoLogging():
return self._GetEvents(suppress_logging=True)
self.event_poller = KubernetesEventPoller(_GetEventsNoLogging)
def _PostCreate(self):
super()._PostCreate()
if self.event_poller:
self.event_poller.StartPolling()
def _Delete(self):
if self.event_poller:
self.event_poller.StopPolling()
self._DeleteAllFromDefaultNamespace()
def GetEvents(self) -> set['KubernetesEvent']:
"""Gets the events for the cluster, including previously polled events."""
if self.event_poller:
return self.event_poller.GetEvents()
return self._GetEvents()
def __getstate__(self):
state = self.__dict__.copy()
del state['event_poller']
return state
def GetResourceMetadata(self):
"""Returns a dict containing metadata about the cluster."""
result = super().GetResourceMetadata()
if self.created:
result['version'] = self.k8s_version
return result
def DeployContainer(
self, name: str, container_spec: container_spec_lib.ContainerSpec
):
"""Deploys Containers according to the ContainerSpec."""
base_name = name
name = base_name + str(len(self.containers[base_name]))
container = KubernetesContainer(container_spec=container_spec, name=name)
self.containers[base_name].append(container)
container.Create()
def DeployContainerService(
self, name: str, container_spec: container_spec_lib.ContainerSpec
):
"""Deploys a ContainerSerivice according to the ContainerSpec."""
service = KubernetesContainerService(container_spec, name)
self.services[name] = service
service.Create()
# TODO(pclay): Move to cached property in Python 3.9
@property
@functools.lru_cache(maxsize=1)
def node_memory_allocatable(self) -> units.Quantity:
"""Usable memory of each node in cluster in KiB."""
stdout, _, _ = RunKubectlCommand(
# TODO(pclay): Take a minimum of all nodes?
['get', 'nodes', '-o', 'jsonpath={.items[0].status.allocatable.memory}']
)
return units.ParseExpression(stdout)
@property
@functools.lru_cache(maxsize=1)
def node_num_cpu(self) -> int:
"""vCPU of each node in cluster."""
stdout, _, _ = RunKubectlCommand(
['get', 'nodes', '-o', 'jsonpath={.items[0].status.capacity.cpu}']
)
return int(stdout)
def LabelDisks(self):
"""Propagate cluster labels to disks if not done by cloud provider."""
pass
# TODO(pclay): integrate with kubernetes_disk.
def GetDefaultStorageClass(self) -> str:
"""Get the default storage class for the provider."""
raise NotImplementedError
def GetNodeSelectors(self) -> list[str]:
"""Get the node selectors section of a yaml for the provider."""
return []
@dataclasses.dataclass(eq=True, frozen=True)
class KubernetesEventResource:
"""Holder for Kubernetes event involved objects."""
kind: str
name: str | None
@classmethod
def FromDict(cls, yaml_data: dict[str, Any]) -> 'KubernetesEventResource':
"""Parse Kubernetes Event YAML output."""
return cls(kind=yaml_data['kind'], name=yaml_data.get('name'))
@dataclasses.dataclass(eq=True, frozen=True)
class KubernetesEvent:
"""Holder for Kubernetes event data."""
resource: KubernetesEventResource
message: str
# Reason is actually more of a machine readable message.
reason: str | None
timestamp: float
@classmethod
def FromDict(cls, yaml_data: dict[str, Any]) -> 'KubernetesEvent | None':
"""Parse Kubernetes Event YAML output."""
if 'message' not in yaml_data:
return None
try:
# There are multiple timestamps. They should be equivalent.
raw_timestamp = yaml_data['lastTimestamp']
assert raw_timestamp
# Python 3.10 cannot handle Z as utc in ISO 8601 timestamps
python_3_10_compatible_timestamp = re.sub('Z$', '+00:00', raw_timestamp)
timestamp = calendar.timegm(
datetime.datetime.fromisoformat(
python_3_10_compatible_timestamp
).timetuple()
)
return cls(
message=yaml_data['message'],
reason=yaml_data.get('reason'),
resource=KubernetesEventResource.FromDict(
yaml_data['involvedObject']
),
timestamp=timestamp,
)
except (AssertionError, KeyError) as e:
logging.warning(
'Tried parsing event: %s but ran into error: %s', yaml_data, e
)
return None