perfkitbenchmarker/linux_benchmarks/kubernetes_scale_benchmark.py (288 lines of code) (raw):
"""Benchmark which runs spins up a large number of pods on kubernetes."""
import collections
from collections import abc
import dataclasses
import json
import time
from typing import Any
from absl import flags
from absl import logging
from dateutil import parser
import numpy as np
from perfkitbenchmarker import benchmark_spec
from perfkitbenchmarker import configs
from perfkitbenchmarker import container_service
from perfkitbenchmarker import errors
from perfkitbenchmarker import sample
from perfkitbenchmarker import virtual_machine
from perfkitbenchmarker import vm_util
FLAGS = flags.FLAGS
BENCHMARK_NAME = 'kubernetes_scale'
BENCHMARK_CONFIG = """
kubernetes_scale:
description: Test scaling an auto-scaled Kubernetes cluster by adding pods.
container_cluster:
cloud: GCP
type: Kubernetes
min_vm_count: 1
max_vm_count: 100
vm_spec: *default_dual_core
poll_for_events: true
"""
NUM_PODS = flags.DEFINE_integer(
'kubernetes_scale_num_replicas', 5, 'Number of new instances to create'
)
REPORT_PERCENTILES = flags.DEFINE_boolean(
'kubernetes_scale_report_latency_percentiles',
True,
'Whether to report percentiles of event latencies',
)
REPORT_LATENCIES = flags.DEFINE_boolean(
'kubernetes_scale_report_individual_latencies',
False,
'Whether to report individual event latencies',
)
CPUS_PER_POD = flags.DEFINE_string(
'kubernetes_scale_pod_cpus', '250m', 'CPU limit per pod'
)
MEMORY_PER_POD = flags.DEFINE_string(
'kubernetes_scale_pod_memory', '250M', 'Memory limit per pod'
)
MANIFEST_TEMPLATE = 'container/kubernetes_scale/kubernetes_scale.yaml.j2'
DEFAULT_IMAGE = 'k8s.gcr.io/pause:3.1'
NVIDIA_GPU_IMAGE = 'nvidia/cuda:11.0.3-runtime-ubuntu20.04'
def _GetImage() -> str:
"""Get the image for the scale deployment."""
if virtual_machine.GPU_COUNT.value:
return NVIDIA_GPU_IMAGE
return DEFAULT_IMAGE
def CheckPrerequisites(_):
"""Validate flags and config."""
if not REPORT_PERCENTILES.value and not REPORT_LATENCIES.value:
raise errors.Config.InvalidValue(
'At least one of percentiles or individual latencies must be reported.'
)
def GetConfig(user_config):
"""Loads and returns benchmark config."""
config = configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME)
return config
def Prepare(bm_spec: benchmark_spec.BenchmarkSpec):
"""Sets additional spec attributes."""
bm_spec.always_call_cleanup = True
def _GetRolloutCreationTime(rollout_name: str) -> int:
"""Returns the time when the rollout was created."""
out, _, _ = container_service.RunKubectlCommand([
'rollout',
'history',
rollout_name,
'-o',
'jsonpath={.metadata.creationTimestamp}',
])
return ConvertToEpochTime(out)
def _GetScaleTimeout() -> int:
"""Returns the timeout for the scale up & teardown."""
base_timeout = 60 * 10
per_pod_timeout = NUM_PODS.value * 3
proposed_timeout = base_timeout + per_pod_timeout
max_timeout = 2 * 60 * 60 # 2 hours
return min(proposed_timeout, max_timeout)
def Run(bm_spec: benchmark_spec.BenchmarkSpec) -> list[sample.Sample]:
"""Scales a large number of pods on kubernetes."""
assert bm_spec.container_cluster
cluster = bm_spec.container_cluster
assert isinstance(cluster, container_service.KubernetesCluster)
initial_nodes = set(cluster.GetNodeNames())
samples, rollout_name = ScaleUpPods(cluster)
start_time = _GetRolloutCreationTime(rollout_name)
pod_samples = ParseStatusChanges(cluster, 'pod', start_time)
samples += pod_samples
_CheckForFailures(cluster, pod_samples)
samples += ParseStatusChanges(
cluster, 'node', start_time, resources_to_ignore=initial_nodes
)
metadata = {
'pod_memory': MEMORY_PER_POD.value,
'pod_cpu': CPUS_PER_POD.value,
'goal_replicas': NUM_PODS.value,
'image': _GetImage(),
}
if virtual_machine.GPU_COUNT.value:
metadata['gpu_count'] = virtual_machine.GPU_COUNT.value
metadata['gpu_type'] = virtual_machine.GPU_TYPE.value
for s in samples:
s.metadata.update(metadata)
return samples
def ScaleUpPods(
cluster: container_service.KubernetesCluster,
) -> tuple[list[sample.Sample], str]:
"""Scales up pods on a kubernetes cluster. Returns samples & rollout name."""
samples = []
initial_pods = set(cluster.GetPodNames())
logging.info('Initial pods: %s', initial_pods)
command = None
if virtual_machine.GPU_COUNT.value:
# Use nvidia-smi to validate NVIDIA_GPU is available.
command = ['sh', '-c', 'nvidia-smi && sleep 3600']
# Request X new pods via YAML apply.
num_new_pods = NUM_PODS.value
max_wait_time = _GetScaleTimeout()
resource_names = cluster.ApplyManifest(
MANIFEST_TEMPLATE,
Name='kubernetes-scaleup',
Replicas=num_new_pods,
CpuRequest=CPUS_PER_POD.value,
MemoryRequest=MEMORY_PER_POD.value,
NvidiaGpuRequest=virtual_machine.GPU_COUNT.value,
Image=_GetImage(),
Command=command,
EphemeralStorageRequest='10Mi',
RolloutTimeout=max_wait_time,
PodTimeout=max_wait_time + 60,
NodeSelectors=cluster.GetNodeSelectors(),
)
# Arbitrarily pick the first resource (it should be the only one.)
assert resource_names
rollout_name = next(resource_names)
try:
start_polling_time = time.monotonic()
cluster.WaitForRollout(rollout_name, timeout=max_wait_time)
all_new_pods = set(cluster.GetPodNames()) - initial_pods
cluster.WaitForResource(
'pod',
condition_name='Ready',
timeout=max_wait_time,
wait_for_all=True,
namespace='default',
)
end_polling_time = time.monotonic()
logging.info(
'In %d seconds, found all %s new pods',
end_polling_time - start_polling_time,
len(all_new_pods),
)
samples.append(
sample.Sample(
'pod_polling_duration',
end_polling_time - start_polling_time,
'seconds',
)
)
return samples, rollout_name
except (
errors.VmUtil.IssueCommandError,
errors.VmUtil.IssueCommandTimeoutError,
vm_util.TimeoutExceededRetryError,
) as e:
logging.warning(
'Kubernetes failed to wait for all the rollout and/or all pods to be'
' ready, even with retries. Full error: %s. Continuing for now. Failure'
' will be checked later by number of pods with ready events.',
e,
)
return [], rollout_name
def _CheckForFailures(
cluster: container_service.KubernetesCluster,
pod_samples: list[sample.Sample],
):
"""Fails the benchmark if not enough pods were created.
Args:
cluster: The cluster to check for failures on.
pod_samples: The samples from pod transition times which includes pod Ready
count.
Raises:
QuotaFailure: If a quota is exceeded.
RunError: If scale up failed for a non-quota reason.
"""
ready_count_sample = next(
(s for s in pod_samples if s.metric == 'pod_Ready_count'), None
)
if ready_count_sample is None:
raise errors.Benchmarks.RunError(
'No pod ready events were found; this should almost never happen.'
)
if ready_count_sample.value >= NUM_PODS.value:
logging.info(
'Benchmark successfully scaled up %d pods, which is more than the goal'
' of %d pods.',
ready_count_sample.value,
NUM_PODS.value,
)
return
events = cluster.GetEvents()
for event in events:
if event.reason == 'FailedScaleUp' and 'quota exceeded' in event.message:
raise errors.Benchmarks.QuotaFailure(
'Failed to scale up to %d pods, at least one pod ran into a quota'
' error: %s' % (NUM_PODS.value, event.message)
)
raise errors.Benchmarks.RunError(
'Benchmark attempted to scale up to %d pods but only %d pods were'
' created & ready. Check above "Kubernetes failed to wait for" logs for'
' exact failure location.' % (NUM_PODS.value, ready_count_sample.value)
)
@dataclasses.dataclass
class KubernetesResourceStatusCondition:
"""Stores the information of a Kubernetes resource status condition."""
resource_type: str
resource_name: str
epoch_time: int
event: str
@classmethod
def FromJsonPathResult(
cls, resource_type: str, resource_name: str, condition: dict[str, Any]
) -> 'KubernetesResourceStatusCondition':
"""Parses the json result of kubectl get."""
str_time = condition['lastTransitionTime']
return cls(
resource_type,
resource_name,
epoch_time=ConvertToEpochTime(str_time),
event=condition['type'],
)
def _GetResourceStatusConditions(
resource_type: str, resource_name: str
) -> list[KubernetesResourceStatusCondition]:
"""Returns the status conditions for a resource.
Args:
resource_type: The type of the resource to get the status conditions for.
resource_name: The name of the resource to get the status conditions for.
Should not have a prefix (eg pods/).
Returns:
A list of status condition, where each condition is a dict with type &
lastTransitionTime.
"""
out, _, _ = container_service.RunKubectlCommand([
'get',
resource_type,
resource_name,
'-o',
'jsonpath={.status.conditions[*]}',
])
# Turn space separated individual json objects into a single json array.
conditions_str = '[' + out.replace('} {', '},{') + ']'
conditions = json.loads(conditions_str)
return [
KubernetesResourceStatusCondition.FromJsonPathResult(
resource_type, resource_name, condition
)
for condition in conditions
]
def ConvertToEpochTime(timestamp: str) -> int:
"""Converts a timestamp to epoch time."""
# Example: 2024-11-08T23:44:36Z
return parser.parse(timestamp).timestamp()
def ParseStatusChanges(
cluster: container_service.KubernetesCluster,
resource_type: str,
start_time: float,
resources_to_ignore: abc.Set[str] = frozenset(),
) -> list[sample.Sample]:
"""Parses status transitions into samples.
Status transitions are pulled from the cluster, and include e.g. when a pod
transitions to PodScheduled or Ready. See tests for more example input/output.
Args:
cluster: The cluster to parse the status changes for.
resource_type: The type of the resource to parse the status changes for
(node or pod).
start_time: The start time of the scale up operation, subtracted from
timestamps.
resources_to_ignore: A set of resource names to ignore.
Returns:
A list of samples, with various percentiles for each status condition.
"""
all_resources = set(cluster.GetAllNamesForResourceType(resource_type + 's'))
all_resources -= resources_to_ignore
conditions = []
for resource in sorted(all_resources):
conditions += _GetResourceStatusConditions(resource_type, resource)
samples = []
overall_times = collections.defaultdict(list)
for condition in conditions:
overall_times[condition.event].append(condition.epoch_time)
for event, timestamps in overall_times.items():
summaries = _SummarizeTimestamps(timestamps)
prefix = f'{resource_type}_{event}_'
if REPORT_PERCENTILES.value:
for percentile, value in summaries.items():
samples.append(
sample.Sample(
prefix + percentile,
value - start_time,
'seconds',
)
)
# Always report counts, because it is used in failure handling
samples.append(
sample.Sample(
prefix + 'count',
len(timestamps),
'count',
)
)
if REPORT_LATENCIES.value:
for condition in conditions:
metadata = {
'k8s_resource_name': condition.resource_name,
}
samples.append(
sample.Sample(
f'{resource_type}_{condition.event}',
condition.epoch_time - start_time,
'seconds',
metadata,
)
)
return samples
def _SummarizeTimestamps(timestamps: list[float]) -> dict[str, float]:
"""Returns a few metrics about a list of timestamps."""
percentiles = [0, 10, 50, 90, 95, 99.9, 100]
summary = {
'mean': np.mean(timestamps),
}
for percentile in percentiles:
summary[f'p{percentile}'] = np.percentile(timestamps, percentile)
return summary
def Cleanup(_):
"""Cleanups scale benchmark. Runs before teardown."""
container_service.RunRetryableKubectlCommand(
['delete', '--all', 'pods', '-n', 'default'], timeout=_GetScaleTimeout()
)