in elastic/shared/parameter_sources/processed.py [0:0]
def create_bulk_corpus_reader(self, corpus, bulk_size, processor, num_clients, client_index):
readers = []
for docs in corpus.documents:
# we want even offsets or docs and meta will be separated
self.logger.debug(
"Task-relative clients at index [%d-%d] using [%s] which has [%d] documents from corpus " "name [%s]",
client_index,
num_clients,
docs.document_file,
docs.number_of_documents,
corpus.name,
)
offset, num_docs = bounds(docs.number_of_documents, client_index, num_clients, ensure_even=True)
if num_docs > 0:
self.logger.debug(
"Task-relative clients at index [%d-%d] will read [%d] docs starting from line offset "
"[%d] for [%s] from corpus [%s] at file[%s].",
client_index,
num_clients,
num_docs,
offset,
docs.target_data_stream,
corpus.name,
docs.document_file,
)
# multiple offset and num docs by 2 to account for meta lines
source = WrappingSlice(io.MmapSource, offset * 2, num_docs * 2)
readers.append(BulkFileReader(docs.document_file, source, processor, corpus.name))
else:
self.logger.info(
"Task-relative clients at index [%d-%d] skip [%s] (no documents to read).",
client_index,
num_clients,
corpus.name,
)
return CorpusReader(readers, bulk_size)