in elastic/shared/track_processors/data_generator.py [0:0]
def create_corpus_reader(self, corpus, num_clients, client_index, bulk_size, processor):
readers = []
for docs in corpus.documents:
# Give each client a designated chunk of the file to use. Note: this will result in out of order delivery.
# Maybe each client could consume each Nth line, where N = client num.
# This will likely result in more seeking and less performance.
offset, num_docs = bounds(docs.number_of_documents, client_index, num_clients)
if num_docs > 0:
self.logger.info(
"Generator [%d] will read [%d] docs starting from line offset [%d] for [%s] from corpus [%s].",
client_index,
num_docs,
offset,
docs.target_data_stream,
corpus.name,
)
source = WrappingSlice(io.MmapSource, offset, num_docs)
readers.append(
JsonFileReader(
os.path.join(self.track_data_root, corpus.name, docs.document_file),
source,
processor,
docs.target_data_stream,
corpus.name,
)
)
else:
self.logger.info(
"Generator [%d] skips [%s] (no documents to read).",
client_index,
corpus.name,
)
return CorpusReader(readers, bulk_size)