in osbenchmark/workload/loader.py [0:0]
def on_after_load_workload(self, workload):
if not self.test_mode_enabled:
return workload
self.logger.info("Preparing workload [%s] for test mode.", str(workload))
for corpus in workload.corpora:
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug("Reducing corpus size to 1000 documents for [%s]", corpus.name)
for document_set in corpus.documents:
# TODO #341: Should we allow this for snapshots too?
if document_set.is_bulk:
document_set.number_of_documents = 1000
if document_set.has_compressed_corpus():
path, ext = io.splitext(document_set.document_archive)
path_2, ext_2 = io.splitext(path)
document_set.document_archive = f"{path_2}-1k{ext_2}{ext}"
document_set.document_file = f"{path_2}-1k{ext_2}"
elif document_set.has_uncompressed_corpus():
path, ext = io.splitext(document_set.document_file)
document_set.document_file = f"{path}-1k{ext}"
else:
raise exceptions.BenchmarkAssertionError(f"Document corpus [{corpus.name}] has neither compressed "
f"nor uncompressed corpus.")
# we don't want to check sizes
document_set.compressed_size_in_bytes = None
document_set.uncompressed_size_in_bytes = None
for test_procedure in workload.test_procedures:
for task in test_procedure.schedule:
# we need iterate over leaf tasks and await iterating over possible intermediate 'parallel' elements
for leaf_task in task:
# iteration-based schedules are divided among all clients and we should provide
# at least one iteration for each client.
if leaf_task.warmup_iterations is not None and leaf_task.warmup_iterations > leaf_task.clients:
count = leaf_task.clients
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug("Resetting warmup iterations to %d for [%s]", count, str(leaf_task))
leaf_task.warmup_iterations = count
if leaf_task.iterations is not None and leaf_task.iterations > leaf_task.clients:
count = leaf_task.clients
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug("Resetting measurement iterations to %d for [%s]", count, str(leaf_task))
leaf_task.iterations = count
if leaf_task.warmup_time_period is not None and leaf_task.warmup_time_period > 0:
leaf_task.warmup_time_period = 0
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug("Resetting warmup time period for [%s] to [%d] seconds.",
str(leaf_task), leaf_task.warmup_time_period)
if leaf_task.time_period is not None and leaf_task.time_period > 10:
leaf_task.time_period = 10
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug("Resetting measurement time period for [%s] to [%d] seconds.",
str(leaf_task), leaf_task.time_period)
# Keep throttled to expose any errors but increase the target throughput for short execution times.
if leaf_task.target_throughput:
original_throughput = leaf_task.target_throughput
leaf_task.params.pop("target-throughput", None)
leaf_task.params.pop("target-interval", None)
leaf_task.params["target-throughput"] = f"{sys.maxsize} {original_throughput.unit}"
return workload