in elastic/shared/track_processors/data_generator.py [0:0]
def _doc_generator(self):
# a number of readers per corpora, for each corpora we consume the readers (and thus docs sets)
# sequentially until exhausted and then reset them
used_corpora = list(self._corpora_doc_ratios.keys())
weights = list(self._corpora_doc_ratios.values())
os.makedirs(self.output_folder, exist_ok=True)
current_docs = 0
current_lines = 0
total_size_in_bytes = 0
self.logger.info(
"Generator [%d] is starting to generate [%d] docs",
self._client_index,
self.docs_per_client,
)
output_file = os.path.join(self.output_folder, f"{self._client_index}.json")
offset_file = f"{output_file}.offset"
current_increment = self._offset_increment
with open(output_file, "wt") as data_file, open(offset_file, mode="wt", encoding="utf-8") as offset_file:
with CorporaReader(self.readers.values()):
for num_docs, lines, raw_size_in_bytes in self._reader_generator(used_corpora, weights):
if current_docs + num_docs >= self.docs_per_client:
self.complete = True
# we deliberately don't trim the last batch to get the exact number of documents. This would break
# encapsulation and this shouldn't be an issue on larger datasets
elif current_docs % 1000 == 0:
self.logger.debug(
"Generator [%d] has written [%d] docs so far.",
self._client_index,
current_docs,
)
# getting chunks of lines which might be less than the batch sizes (cannot be greater) so we output
# when we pass the increment and update the offsets. We then shift the increment.
if current_lines >= current_increment:
print(
"%d;%d" % (current_lines, data_file.tell()),
file=offset_file,
)
current_increment += self._offset_increment
current_docs += num_docs
current_lines += len(lines)
total_size_in_bytes += raw_size_in_bytes
data_file.writelines([json.dumps(line) + "\n" for line in lines])
if self.complete:
self.logger.info(
"Generator [%d] has completed generating [%d] docs with [%d] bytes.",
self._client_index,
current_docs,
total_size_in_bytes,
)
FileMetadata.write(
self.output_folder,
self._client_index,
current_docs,
total_size_in_bytes,
)
break