def _doc_generator()

in elastic/shared/track_processors/data_generator.py [0:0]


    def _doc_generator(self):
        # a number of readers per corpora, for each corpora we consume the readers (and thus docs sets)
        # sequentially until exhausted and then reset them
        used_corpora = list(self._corpora_doc_ratios.keys())
        weights = list(self._corpora_doc_ratios.values())
        os.makedirs(self.output_folder, exist_ok=True)

        current_docs = 0
        current_lines = 0
        total_size_in_bytes = 0
        self.logger.info(
            "Generator [%d] is starting to generate [%d] docs",
            self._client_index,
            self.docs_per_client,
        )
        output_file = os.path.join(self.output_folder, f"{self._client_index}.json")
        offset_file = f"{output_file}.offset"
        current_increment = self._offset_increment
        with open(output_file, "wt") as data_file, open(offset_file, mode="wt", encoding="utf-8") as offset_file:
            with CorporaReader(self.readers.values()):
                for num_docs, lines, raw_size_in_bytes in self._reader_generator(used_corpora, weights):
                    if current_docs + num_docs >= self.docs_per_client:
                        self.complete = True
                        # we deliberately don't trim the last batch to get the exact number of documents. This would break
                        # encapsulation and this shouldn't be an issue on larger datasets
                    elif current_docs % 1000 == 0:
                        self.logger.debug(
                            "Generator [%d] has written [%d] docs so far.",
                            self._client_index,
                            current_docs,
                        )
                    # getting chunks of lines which might be less than the batch sizes (cannot be greater) so we output
                    # when we pass the increment and update the offsets. We then shift the increment.
                    if current_lines >= current_increment:
                        print(
                            "%d;%d" % (current_lines, data_file.tell()),
                            file=offset_file,
                        )
                        current_increment += self._offset_increment
                    current_docs += num_docs
                    current_lines += len(lines)
                    total_size_in_bytes += raw_size_in_bytes
                    data_file.writelines([json.dumps(line) + "\n" for line in lines])
                    if self.complete:
                        self.logger.info(
                            "Generator [%d] has completed generating [%d] docs with [%d] bytes.",
                            self._client_index,
                            current_docs,
                            total_size_in_bytes,
                        )
                        FileMetadata.write(
                            self.output_folder,
                            self._client_index,
                            current_docs,
                            total_size_in_bytes,
                        )
                        break