perfkitbenchmarker/linux_benchmarks/kubernetes_hpa_benchmark.py (141 lines of code) (raw):
# Copyright 2019 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Runs a locust based hpa benchmark on a k8s cluster."""
from collections.abc import Callable
import functools
import json
import logging
import threading
import typing
from typing import Any, Dict, List
from absl import flags
from perfkitbenchmarker import background_tasks
from perfkitbenchmarker import benchmark_spec as bm_spec
from perfkitbenchmarker import configs
from perfkitbenchmarker import container_service
from perfkitbenchmarker import errors
from perfkitbenchmarker.linux_packages import locust
from perfkitbenchmarker.sample import Sample
FLAGS = flags.FLAGS
flags.DEFINE_string(
'kubernetes_hpa_runtime_class_name',
None,
'A custom runtimeClassName to apply to the pods.',
)
BENCHMARK_NAME = 'kubernetes_hpa'
BENCHMARK_CONFIG = """
kubernetes_hpa:
description: Benchmarks how quickly hpa reacts to load
vm_groups:
clients:
vm_spec: *default_dual_core
vm_count: 1
container_specs:
kubernetes_fib:
image: fibonacci
container_registry: {}
container_cluster:
cloud: GCP
type: Kubernetes
min_vm_count: 3
max_vm_count: 50
vm_spec: *default_dual_core
flags:
locust_path: locust/rampup.py
"""
_INGRESS_JSONPATH = '{.status.loadBalancer.ingress[0]}'
def GetConfig(user_config: Dict[str, Any]) -> Dict[str, Any]:
"""Load and return benchmark config.
Args:
user_config: user supplied configuration (flags and config file)
Returns:
loaded benchmark configuration
"""
config = configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME)
return config
def _PrepareCluster(benchmark_spec: bm_spec.BenchmarkSpec):
"""Prepares a cluster to run the hpa benchmark."""
cluster: container_service.KubernetesCluster = (
benchmark_spec.container_cluster
)
fib_image = benchmark_spec.container_specs['kubernetes_fib'].image
cluster.ApplyManifest(
'container/kubernetes_hpa/fib.yaml.j2',
fib_image=fib_image,
runtime_class_name=FLAGS.kubernetes_hpa_runtime_class_name,
node_selectors=cluster.GetNodeSelectors(),
)
cluster.WaitForResource('deploy/fib', 'available', namespace='fib')
cluster.WaitForResource(
'service/fib',
_INGRESS_JSONPATH,
namespace='fib',
condition_type='jsonpath=',
)
def _PrepareLocust(benchmark_spec: bm_spec.BenchmarkSpec):
"""Prepares a vm to run locust."""
vm = benchmark_spec.vms[0]
locust.Install(vm)
locust.Prep(vm)
def Prepare(benchmark_spec: bm_spec.BenchmarkSpec):
"""Install fib workload (and associated hpa) on the K8s Cluster.
Args:
benchmark_spec: The benchmark specification. Contains all data that is
required to run the benchmark.
"""
prepare_fns = [
functools.partial(_PrepareCluster, benchmark_spec),
functools.partial(_PrepareLocust, benchmark_spec),
]
background_tasks.RunThreaded(lambda f: f(), prepare_fns)
def _GetLoadBalancerURI() -> str:
"""Returns the SUT Load Balancer URI."""
stdout, _, _ = container_service.RunKubectlCommand([
'get',
'-n',
'fib',
'svc/fib',
'-o',
f"jsonpath='{_INGRESS_JSONPATH}'",
])
ingress = json.loads(stdout.strip("'"))
if 'ip' in ingress:
ip = ingress['ip']
elif 'hostname' in ingress:
ip = ingress['hostname']
else:
raise errors.Benchmarks.RunError(
'No IP or hostname found in ingress from stdout ' + stdout
)
return 'http://' + ip.strip() + ':5000'
def Run(benchmark_spec: bm_spec.BenchmarkSpec) -> List[Sample]:
"""Run a benchmark against the Nginx server."""
addr = _GetLoadBalancerURI()
vm = benchmark_spec.vms[0]
cluster: container_service.KubernetesCluster = typing.cast(
container_service.KubernetesCluster, benchmark_spec.container_cluster
)
samples = []
stop = threading.Event()
kmc = KubernetesMetricsCollector(samples, stop)
def RunLocust():
for s in locust.Run(vm, addr):
samples.append(s)
stop.set()
background_tasks.RunThreaded(
lambda f: f(),
[
lambda: kmc.ObserveNumReplicas(cluster, 'deploy/fib', 'fib'),
lambda: kmc.ObserveNumNodes(cluster),
RunLocust,
],
max_concurrent_threads=3,
)
return samples
class KubernetesMetricsCollector:
"""Collects k8s cluster metrics."""
def __init__(self, samples: List[Sample], stop: threading.Event):
"""Creates a KubernetesMetricsCollector.
Args:
samples: List of samples. Observed metrics will be added to this list.
stop: Event indicating when the test is over. Once set, the Observe*()
methods will stop.
"""
self._samples = samples
self._stop = stop
def ObserveNumReplicas(
self,
cluster: container_service.KubernetesCluster,
resource_name: str,
namespace: str = '',
) -> None:
"""Periodically samples the number of replicas.
Adds the result to self._samples.
Expected to be run in a background thread. Never completes until self._stop
is signaled.
Args:
cluster: The cluster in question.
resource_name: The deployment/statefulset/etc's name, e.g.
'deployment/my_deployment'.
namespace: The namespace of the resource. If omitted, the 'default'
namespace will be used.
"""
self._Observe(
lambda: cluster.GetNumReplicasSamples(resource_name, namespace)
)
def ObserveNumNodes(
self,
cluster: container_service.KubernetesCluster,
) -> None:
"""Periodically samples the number of nodes.
Adds result to self._samples.
Expected to be run in a background thread. Never completes until self._stop
is signaled.
Args:
cluster: The cluster in question.
"""
self._Observe(cluster.GetNumNodesSamples)
def _Observe(
self,
observe_fn: Callable[[], List[Sample]],
) -> None:
"""Call the specified function until self._stop is signalled.
Results are appended to self._samples. Timeouts are ignored.
Args:
observe_fn: The function to call.
"""
success_count = 0
failure_count = 0
while True:
try:
self._samples.extend(observe_fn())
success_count += 1
except (
errors.VmUtil.IssueCommandError,
errors.VmUtil.IssueCommandTimeoutError,
) as e:
# Ignore errors, timeouts - there'll be a gap in the data, but that's
# ok.
logging.warning(
'Ignoring exception that occurred while observing cluster: %s', e)
failure_count += 1
if self._stop.wait(timeout=1.0):
if success_count + failure_count == 0:
raise AssertionError(
'Unexpected: no successful OR unsuccessful attempts occurred?'
)
assert success_count / (success_count + failure_count) >= 0.90
return
def Cleanup(benchmark_spec):
"""Cleanup."""
del benchmark_spec