def __init__()

in elastic/shared/track_processors/data_generator.py [0:0]


    def __init__(self, track, track_data_root, client_index="*", client_count=None):
        self.logger = logging.getLogger(__name__)
        self.include_doc_size_with_metadata = False
        self.corpora = []
        self.track = track
        self._random_seed = track.selected_challenge_or_default.parameters.get("random-seed", None)
        self.track_data_root = track_data_root
        # check if output folder exists and contains files. If it does, we complete early unless force=True
        self.output_folder = track.selected_challenge_or_default.parameters["output-folder"]
        self.logger.info("Using output folder [%s]", self.output_folder)
        self.complete = False
        if not track.selected_challenge_or_default.parameters.get("force-data-generation"):
            file_pattern = f"{client_index}.json"
            if len(glob.glob(os.path.join(self.output_folder, file_pattern))) > 0:
                self.complete = True
                self.logger.info("Skipping data generation as files are present and force-data-generation is set to false.")

        self._client_index = client_index
        self._client_count = client_count

        self._integration_ratios = mandatory(
            track.selected_challenge_or_default.parameters,
            "integration-ratios",
            "generate-data",
        )
        self._exclude_properties = track.selected_challenge_or_default.parameters.get("exclude-properties", {})

        end_date = parse_date_time(
            track.selected_challenge_or_default.parameters.get("end-date", DEFAULT_END_DATE),
            utcnow=utc_now,
        )
        start_date = parse_date_time(
            track.selected_challenge_or_default.parameters.get("start-date", DEFAULT_START_DATE),
            utcnow=utc_now,
        )

        # bulk-end-date and bulk-start-date are undocumented and optional. If specified we generate data for the widest
        # date range possible as it will likely be re-used for incremental load.
        if bulk_end_date := track.selected_challenge_or_default.parameters.get("bulk-end-date", None):
            if (bulk_end_date := parse_date_time(bulk_end_date, utcnow=utc_now)) > end_date:
                end_date = bulk_end_date

        if bulk_start_date := track.selected_challenge_or_default.parameters.get("bulk-start-date", None):
            if (bulk_start_date := parse_date_time(bulk_start_date, utcnow=utc_now)) < start_date:
                start_date = bulk_start_date

        self.logger.info(
            "Using date range [%s] to [%s] for generation",
            start_date.isoformat(),
            end_date.isoformat(),
        )

        if start_date > end_date:
            raise exceptions.TrackConfigError(f'"start-date" cannot be greater than "end-date" for data generation.')
        # number of days to run the test for - used to calculate the amount of data to generate
        number_of_days = (end_date - start_date).total_seconds() / 86400
        self._max_generation_size_gb = convert_to_gib(
            mandatory(
                track.selected_challenge_or_default.parameters,
                "max-generated-corpus-size",
                "generate-data",
            )
        )
        raw_volume_per_day = mandatory(
            track.selected_challenge_or_default.parameters,
            "raw-data-volume-per-day",
            "generate-data",
        )
        self._data_generation_gb = min(
            convert_to_gib(raw_volume_per_day) * number_of_days,
            self._max_generation_size_gb,
        )
        self.logger.info("[%s]GB of raw data to be generated", self._data_generation_gb)
        self._processor = self._json_processor

        # mainly for unit testing but can be modified and might be worth making this a factor of num clients later
        self._offset_increment = track.selected_challenge_or_default.parameters.get("offset-increment", 50000)
        self._batch_size = mandatory(
            track.selected_challenge_or_default.parameters,
            "generator-batch-size",
            "generate-data",
        )
        # batch size cannot be greater than the offset increment or we will not generate offsets
        if self._batch_size > self._offset_increment:
            raise exceptions.RallyAssertionError(
                f"generator-batch-size [{self._batch_size}] cannot be greater than offset-increment " f"[{self._offset_increment}]"
            )
        ratio_total = 0
        # number of lines to sample from each corpus to identify the raw->json factor each corpus.
        # Principally for testing.
        self._sample_size = track.selected_challenge_or_default.parameters.get("sample-size", 10000)
        for integration_name, integration in self._integration_ratios.items():
            for corpus_name, ratio in integration["corpora"].items():
                corpus = next((c for c in track.corpora if c.name == corpus_name), None)
                self.corpora.append(corpus)
                if not corpus:
                    raise exceptions.RallyAssertionError(
                        f"Corpus [{corpus_name}] is defined for data generation in integration [{integration_name}] "
                        f"but is not present in track"
                    )
                ratio_total += ratio

        # after ratio_total is calculated, recalculate all integration ratios to total up to 1
        if round(ratio_total, 3) != 1.0:
            self.logger.info(f"Total corpus ratios {ratio_total} != 1.0, recalculating corpus ratios")
            for integration_name, integration in self._integration_ratios.items():
                for corpus_name, ratio in integration["corpora"].items():
                    integration["corpora"][corpus_name] = ratio / ratio_total
            self.logger.info(f"Corpus ratios recalculated to:\n {self._integration_ratios}")