in elastic/shared/track_processors/data_generator.py [0:0]
def __init__(self, track, track_data_root, client_index="*", client_count=None):
self.logger = logging.getLogger(__name__)
self.include_doc_size_with_metadata = False
self.corpora = []
self.track = track
self._random_seed = track.selected_challenge_or_default.parameters.get("random-seed", None)
self.track_data_root = track_data_root
# check if output folder exists and contains files. If it does, we complete early unless force=True
self.output_folder = track.selected_challenge_or_default.parameters["output-folder"]
self.logger.info("Using output folder [%s]", self.output_folder)
self.complete = False
if not track.selected_challenge_or_default.parameters.get("force-data-generation"):
file_pattern = f"{client_index}.json"
if len(glob.glob(os.path.join(self.output_folder, file_pattern))) > 0:
self.complete = True
self.logger.info("Skipping data generation as files are present and force-data-generation is set to false.")
self._client_index = client_index
self._client_count = client_count
self._integration_ratios = mandatory(
track.selected_challenge_or_default.parameters,
"integration-ratios",
"generate-data",
)
self._exclude_properties = track.selected_challenge_or_default.parameters.get("exclude-properties", {})
end_date = parse_date_time(
track.selected_challenge_or_default.parameters.get("end-date", DEFAULT_END_DATE),
utcnow=utc_now,
)
start_date = parse_date_time(
track.selected_challenge_or_default.parameters.get("start-date", DEFAULT_START_DATE),
utcnow=utc_now,
)
# bulk-end-date and bulk-start-date are undocumented and optional. If specified we generate data for the widest
# date range possible as it will likely be re-used for incremental load.
if bulk_end_date := track.selected_challenge_or_default.parameters.get("bulk-end-date", None):
if (bulk_end_date := parse_date_time(bulk_end_date, utcnow=utc_now)) > end_date:
end_date = bulk_end_date
if bulk_start_date := track.selected_challenge_or_default.parameters.get("bulk-start-date", None):
if (bulk_start_date := parse_date_time(bulk_start_date, utcnow=utc_now)) < start_date:
start_date = bulk_start_date
self.logger.info(
"Using date range [%s] to [%s] for generation",
start_date.isoformat(),
end_date.isoformat(),
)
if start_date > end_date:
raise exceptions.TrackConfigError(f'"start-date" cannot be greater than "end-date" for data generation.')
# number of days to run the test for - used to calculate the amount of data to generate
number_of_days = (end_date - start_date).total_seconds() / 86400
self._max_generation_size_gb = convert_to_gib(
mandatory(
track.selected_challenge_or_default.parameters,
"max-generated-corpus-size",
"generate-data",
)
)
raw_volume_per_day = mandatory(
track.selected_challenge_or_default.parameters,
"raw-data-volume-per-day",
"generate-data",
)
self._data_generation_gb = min(
convert_to_gib(raw_volume_per_day) * number_of_days,
self._max_generation_size_gb,
)
self.logger.info("[%s]GB of raw data to be generated", self._data_generation_gb)
self._processor = self._json_processor
# mainly for unit testing but can be modified and might be worth making this a factor of num clients later
self._offset_increment = track.selected_challenge_or_default.parameters.get("offset-increment", 50000)
self._batch_size = mandatory(
track.selected_challenge_or_default.parameters,
"generator-batch-size",
"generate-data",
)
# batch size cannot be greater than the offset increment or we will not generate offsets
if self._batch_size > self._offset_increment:
raise exceptions.RallyAssertionError(
f"generator-batch-size [{self._batch_size}] cannot be greater than offset-increment " f"[{self._offset_increment}]"
)
ratio_total = 0
# number of lines to sample from each corpus to identify the raw->json factor each corpus.
# Principally for testing.
self._sample_size = track.selected_challenge_or_default.parameters.get("sample-size", 10000)
for integration_name, integration in self._integration_ratios.items():
for corpus_name, ratio in integration["corpora"].items():
corpus = next((c for c in track.corpora if c.name == corpus_name), None)
self.corpora.append(corpus)
if not corpus:
raise exceptions.RallyAssertionError(
f"Corpus [{corpus_name}] is defined for data generation in integration [{integration_name}] "
f"but is not present in track"
)
ratio_total += ratio
# after ratio_total is calculated, recalculate all integration ratios to total up to 1
if round(ratio_total, 3) != 1.0:
self.logger.info(f"Total corpus ratios {ratio_total} != 1.0, recalculating corpus ratios")
for integration_name, integration in self._integration_ratios.items():
for corpus_name, ratio in integration["corpora"].items():
integration["corpora"][corpus_name] = ratio / ratio_total
self.logger.info(f"Corpus ratios recalculated to:\n {self._integration_ratios}")