in perfkitbenchmarker/linux_packages/ycsb.py [0:0]
def RunStaircaseLoads(self, vms, workloads, **kwargs):
"""Run each workload in 'workloads' in succession.
A staircase load is applied for each workload file, for each entry in
ycsb_threads_per_client.
Args:
vms: List of VirtualMachine objects to generate load from.
workloads: List of workload file names.
**kwargs: Additional parameters to pass to each run. See constructor for
options.
Returns:
List of sample.Sample objects.
"""
all_results = []
parameters = {}
for workload_index, workload_file in enumerate(workloads):
if FLAGS.ycsb_operation_count:
parameters = {'operationcount': FLAGS.ycsb_operation_count}
if FLAGS.ycsb_record_count:
parameters['recordcount'] = FLAGS.ycsb_record_count
if FLAGS.ycsb_field_count:
parameters['fieldcount'] = FLAGS.ycsb_field_count
if FLAGS.ycsb_field_length:
parameters['fieldlength'] = FLAGS.ycsb_field_length
if FLAGS.ycsb_timelimit:
parameters['maxexecutiontime'] = FLAGS.ycsb_timelimit
hdr_files_dir = posixpath.join(self.hdr_dir, str(workload_index))
if FLAGS.ycsb_measurement_type == ycsb_stats.HDRHISTOGRAM:
parameters['hdrhistogram.fileoutput'] = True
parameters['hdrhistogram.output.path'] = hdr_files_dir
if FLAGS.ycsb_requestdistribution:
parameters['requestdistribution'] = FLAGS.ycsb_requestdistribution
if FLAGS.ycsb_readproportion is not None:
parameters['readproportion'] = FLAGS.ycsb_readproportion
if FLAGS.ycsb_updateproportion is not None:
parameters['updateproportion'] = FLAGS.ycsb_updateproportion
if FLAGS.ycsb_scanproportion is not None:
parameters['scanproportion'] = FLAGS.ycsb_scanproportion
parameters.update(kwargs)
remote_path = posixpath.join(
linux_packages.INSTALL_DIR, os.path.basename(workload_file)
)
with open(workload_file) as fp:
workload_meta = ParseWorkload(fp.read())
workload_meta.update(kwargs)
workload_meta.update(
workload_name=os.path.basename(workload_file),
workload_index=workload_index,
stage='run',
)
args = [
((vm, workload_file, remote_path), {}) for vm in dict.fromkeys(vms)
]
background_tasks.RunThreaded(PushWorkload, args)
parameters['parameter_files'] = [remote_path]
# _GetThreadsQpsPerLoaderList() passes tuple of (client_count, target=0)
# if no target is passed via flags.
for client_count, target_qps_per_vm in _GetThreadsQpsPerLoaderList(
len(vms)
):
@vm_util.Retry(
retryable_exceptions=ycsb_stats.CombineHdrLogError,
timeout=-1,
max_retries=5,
)
def _DoRunStairCaseLoad(
client_count: int,
target_qps_per_vm: int,
workload_meta: Mapping[str, Any],
is_sustained: bool = False,
):
parameters['threads'] = client_count
if target_qps_per_vm:
# Threads should be less than the target QPS since YCSB throttles
# weirdly when threads is much larger than target.
parameters['threads'] = min(client_count, target_qps_per_vm)
parameters['target'] = int(target_qps_per_vm * len(vms))
if is_sustained:
parameters['maxexecutiontime'] = (
FLAGS.ycsb_dynamic_load_sustain_timelimit
)
start = time.time()
results = self._RunThreaded(vms, **parameters)
events.record_event.send(
type(self).__name__,
event='run',
start_timestamp=start,
end_timestamp=time.time(),
metadata=copy.deepcopy(parameters),
)
client_meta = workload_meta.copy()
client_meta.update(parameters)
client_meta.update(
clients=len(vms) * client_count,
threads_per_client_vm=client_count,
)
# Values passed in via this flag do not get recorded in metadata.
# The target passed in is applied to each client VM, so multiply by
# len(vms).
for pv in FLAGS.ycsb_run_parameters:
param, value = pv.split('=', 1)
if param == 'target':
value = int(value) * len(vms)
client_meta[param] = value
if FLAGS.ycsb_include_individual_results and len(results) > 1:
for i, result in enumerate(results):
all_results.extend(
ycsb_stats.CreateSamples(
ycsb_result=result,
ycsb_version=_YCSB_VERSION.value,
ycsb_commit=_YCSB_COMMIT.value,
include_histogram=FLAGS.ycsb_histogram,
include_command_line=_SHOULD_RECORD_COMMAND_LINE.value,
result_type='individual',
result_index=i,
**client_meta,
)
)
if self.measurement_type == ycsb_stats.HDRHISTOGRAM:
combined_log = ycsb_stats.CombineHdrHistogramLogFiles(
self.hdr_dir, parameters['hdrhistogram.output.path'], vms
)
parsed_hdr = ycsb_stats.ParseHdrLogs(combined_log)
combined = ycsb_stats.CombineResults(
results, self.measurement_type, parsed_hdr
)
else:
combined = ycsb_stats.CombineResults(
results, self.measurement_type, {}
)
run_samples = list(
ycsb_stats.CreateSamples(
ycsb_result=combined,
ycsb_version=_YCSB_VERSION.value,
ycsb_commit=_YCSB_COMMIT.value,
include_command_line=_SHOULD_RECORD_COMMAND_LINE.value,
include_histogram=FLAGS.ycsb_histogram,
result_type='combined',
**client_meta,
)
)
overall_throughput = 0
for s in run_samples:
if s.metric == 'overall Throughput':
overall_throughput += s.value
return overall_throughput, run_samples
if target_qps_per_vm > 0 and parameters.get('target', 0) > 0:
raise errors.Benchmarks.RunError(
'Target QPS should only be passed in via one of'
' --ycsb_threads_per_client, --ycsb_target_qps, or parameters.'
)
if 'target' in parameters:
target_qps_per_vm = int(parameters['target'] / len(vms))
# Consider refactoring so that target_qps_per_vm doesn't need to be
# passed in. Target should already be set in parameters.
target_throughput, run_samples = _DoRunStairCaseLoad(
client_count, target_qps_per_vm, workload_meta
)
# Uses 5 * unthrottled throughput as starting point.
target_throughput *= 5
all_results.extend(run_samples)
is_sustained = False
while FLAGS.ycsb_dynamic_load:
actual_throughput, run_samples = _DoRunStairCaseLoad(
client_count,
int(target_throughput // len(vms)),
workload_meta,
is_sustained,
)
is_sustained = FLAGS.ycsb_dynamic_load_sustain_throughput_ratio < (
actual_throughput / target_throughput
)
for s in run_samples:
s.metadata['sustained'] = is_sustained
all_results.extend(run_samples)
target_throughput = self._GetRunLoadTarget(
actual_throughput, is_sustained
)
if target_throughput is None:
break
if _DYNAMIC_LOAD_SLEEP_SEC.value > 0:
logging.info(
'Sleeping %s seconds after dynamic load.',
_DYNAMIC_LOAD_SLEEP_SEC.value,
)
time.sleep(_DYNAMIC_LOAD_SLEEP_SEC.value)
return all_results