in esrally/track/loader.py [0:0]
def on_after_load_track(self, track):
if not self.test_mode_enabled:
return track
self.logger.info("Preparing track [%s] for test mode.", str(track))
for corpus in track.corpora:
for document_set in corpus.documents:
# TODO #341: Should we allow this for snapshots too?
if document_set.is_bulk:
if document_set.number_of_documents > 1000:
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(
"Reducing corpus size to 1000 documents in corpus [%s], uncompressed source file [%s]",
corpus.name,
document_set.document_file,
)
document_set.number_of_documents = 1000
if document_set.has_compressed_corpus():
path, ext = io.splitext(document_set.document_archive)
path_2, ext_2 = io.splitext(path)
document_set.document_archive = f"{path_2}-1k{ext_2}{ext}"
document_set.document_file = f"{path_2}-1k{ext_2}"
elif document_set.has_uncompressed_corpus():
path, ext = io.splitext(document_set.document_file)
document_set.document_file = f"{path}-1k{ext}"
else:
raise exceptions.RallyAssertionError(
f"Document corpus [{corpus.name}] has neither compressed nor uncompressed corpus."
)
# we don't want to check sizes
document_set.compressed_size_in_bytes = None
document_set.uncompressed_size_in_bytes = None
else:
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(
"Maintaining existing size of %d documents in corpus [%s], uncompressed source file [%s]",
document_set.number_of_documents,
corpus.name,
document_set.document_file,
)
for challenge in track.challenges:
for task in challenge.schedule:
# we need iterate over leaf tasks and await iterating over possible intermediate 'parallel' elements
for leaf_task in task:
# iteration-based schedules are divided among all clients and we should provide
# at least one iteration for each client.
if leaf_task.warmup_iterations is not None and leaf_task.warmup_iterations > leaf_task.clients:
count = leaf_task.clients
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug("Resetting warmup iterations to %d for [%s]", count, str(leaf_task))
leaf_task.warmup_iterations = count
if leaf_task.iterations is not None and leaf_task.iterations > leaf_task.clients:
count = leaf_task.clients
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug("Resetting measurement iterations to %d for [%s]", count, str(leaf_task))
leaf_task.iterations = count
if leaf_task.warmup_time_period is not None and leaf_task.warmup_time_period > 0:
leaf_task.warmup_time_period = 0
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(
"Resetting warmup time period for [%s] to [%d] seconds.", str(leaf_task), leaf_task.warmup_time_period
)
if leaf_task.time_period is not None and leaf_task.time_period > 10:
leaf_task.time_period = 10
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(
"Resetting measurement time period for [%s] to [%d] seconds.", str(leaf_task), leaf_task.time_period
)
# Keep throttled to expose any errors but increase the target throughput for short execution times.
if leaf_task.target_throughput:
original_throughput = leaf_task.target_throughput
leaf_task.params.pop("target-throughput", None)
leaf_task.params.pop("target-interval", None)
leaf_task.params["target-throughput"] = f"{sys.maxsize} {original_throughput.unit}"
return track