in elastic/shared/parameter_sources/processed.py [0:0]
def _doc_generator(self, num_clients, client_index):
report_size = self.bulk_size * 10
# if each client needs more than the total number of docs in corpus we might as well have each client loop over
# all of them
if self.docs_per_client > self.total_corpus_docs:
self.corpus_reader = self.create_bulk_corpus_reader(
self.corpus,
self.lines_per_bulk,
self._processor,
num_clients=1,
client_index=0,
)
else:
# each client gets a section of the files
self.corpus_reader = self.create_bulk_corpus_reader(
self.corpus,
self.lines_per_bulk,
self._processor,
num_clients,
client_index,
)
params = self._params.copy()
with self.corpus_reader:
while not self._complete:
num_docs, lines, raw_size_in_bytes = next(self.corpus_reader)
if self.current_docs + num_docs >= self.docs_per_client:
self._complete = True
self.logger.info("Completed with [%s] docs", self.docs_per_client)
# we deliberately don't trim the last batch to get the exact number of documents. This would break
# encapsulation and this shouldn't be an issue on larger datasets
elif self.current_docs > 0 and self.current_docs % report_size == 0:
self.logger.debug("[%s] docs indexed", self.current_docs)
self.current_docs += num_docs
params["body"] = lines
params["unit"] = "docs"
params["action-metadata-present"] = True
params["bulk-size"] = num_docs
self.event_time_span = (self.max_timestamp - self.min_timestamp).total_seconds()
relative_time = int(time.perf_counter()) - self.start_time
params["param-source-stats"] = {
"client": client_index,
"raw-size-bytes": raw_size_in_bytes,
"event-time-span": self.event_time_span,
"relative-time": relative_time,
"index-lag": self.event_time_span - relative_time,
"min-timestamp": self.min_timestamp.isoformat(sep="T", timespec=self._time_format),
"max-timestamp": self.max_timestamp.isoformat(sep="T", timespec=self._time_format),
}
yield params