perfkitbenchmarker/linux_packages/locust.py (88 lines of code) (raw):
"""Utilities for managing a locust benchmark on a VM."""
import csv
import enum
import logging
import re
from typing import Iterable, TYPE_CHECKING
from absl import flags
from perfkitbenchmarker import data
from perfkitbenchmarker import sample
if TYPE_CHECKING:
from perfkitbenchmarker import linux_virtual_machine # pylint: disable=g-import-not-at-top
FLAGS = flags.FLAGS
class Locustfile(enum.Enum):
"""Enum with paths to predefined locustfiles."""
SIMPLE = 'locust/simple.py'
RAMPUP = 'locust/rampup.py'
def GetPath(self):
return data.ResourcePath(self.value)
_LOCUST_FILE = flags.DEFINE_string(
'locust_path',
None,
'Path to the locust file to use, e.g. `locust/simple.py`. Required.',
)
def Install(vm: 'linux_virtual_machine.BaseLinuxVirtualMachine') -> None:
"""Installs locust on the given VM.
Installs locust on the indicated VM. Does not start locust.
Running this a second time will idempotently install locust, but will have no
other effect. (If locust is already running at the time, it will not interrupt
it.)
Args:
vm: Already running VM where locust should be installed.
Raises:
errors.VirtualMachine.RemoteCommandError: If an error occurred on the VM.
"""
vm.RunCommand(['sudo', 'apt', 'update'])
vm.RunCommand(['sudo', 'apt', 'install', 'python3-locust', '-y'])
def Prep(
vm: 'linux_virtual_machine.BaseLinuxVirtualMachine',
) -> None:
"""Prepares a locustfile to run on the given VM.
Prepares the locustfile, but does not start (or install) locust.
Running this a second time will idempotently replace the locustfile, but will
have no other effect. (If locust is already running at the time, it will not
interrupt it.)
Args:
vm: Already running VM where locust should be installed.
Raises:
errors.VirtualMachine.RemoteCommandError: If an error occurred on the VM.
"""
if _LOCUST_FILE.value is None:
raise ValueError('Locustfile path must be specified via flag.')
locustfile_path = data.ResourcePath(_LOCUST_FILE.value)
vm.RemoteCopy(locustfile_path, 'locustfile.py')
def Run(
vm: 'linux_virtual_machine.BaseLinuxVirtualMachine', target_host: str
) -> Iterable[sample.Sample]:
"""Runs locust.
This won't return until the test is complete. (Test length is defined by the
locustfile.) This can be called repeatedly, in which case, the test will run
again. This should not be called repeatedly *in parallel*.
Args:
vm: Already running VM where locust/locustfile has already been installed.
target_host: The SUT. Can be an ipaddr or hostname. Must include the scheme.
e.g. 'http://192.168.0.1:8080'
Yields:
Samples corresponding to the locust results.
Raises:
errors.VirtualMachine.RemoteCommandError: If an error occurred on the VM.
(Notably, if `prep()` was not previously called to install locust, then a
RemoteCommandError will be raised.)
"""
_, stderr, code = vm.RunCommand(
[
'locust',
'-f',
'locustfile.py',
'--host',
target_host,
'--autostart',
'--csv',
'test1',
'--autoquit',
'5',
],
ignore_failure=True,
)
if code != 0:
logging.info(
'Locust had non-zero exit code: %s and error: %s. This may just mean'
' there were some number of failing requests, which is acceptable.',
code,
stderr,
)
stdout, _, _ = vm.RunCommand(['cat', 'test1_stats_history.csv'])
yield from _ConvertLocustResultsToSamples(stdout)
# Grab the last line again and re-export those samples as the "locust_overall"
# samples. NB:
# 1. CSV outputs sequentially so last line is last timestamp
# 2. Timestamps continue to aggregate, so last timestamp is "overall".
stdout, _, _ = vm.RunCommand(
'(head -n1 && tail -n1) < test1_stats_history.csv'
)
yield from _ConvertLocustResultsToSamples(
stdout, metric_namespace='locust_overall'
)
def _ConvertLocustResultsToSamples(
locust_results: str,
metric_namespace: str = 'locust',
) -> Iterable[sample.Sample]:
"""Converts each csv row from locust to a PKB sample."""
lines = locust_results.splitlines()
reader = csv.DictReader(lines)
for row in reader:
for field in reader.fieldnames:
if field in ['Timestamp', 'Type', 'Name']:
continue
if row[field] == 'N/A':
continue
yield sample.Sample(
metric=metric_namespace + '/' + _SanitizeFieldName(field),
value=float(row[field]),
unit='',
metadata={'locustfile_path': _LOCUST_FILE.value},
timestamp=int(row['Timestamp']),
)
def _SanitizeFieldName(field: str) -> str:
field = re.sub(' ', '_', field)
field = re.sub('%', 'p', field)
field = re.sub('/', '_per_', field)
return field