def _doc_generator()

in elastic/shared/parameter_sources/processed.py [0:0]


    def _doc_generator(self, num_clients, client_index):
        report_size = self.bulk_size * 10
        # if each client needs more than the total number of docs in corpus we might as well have each client loop over
        # all of them
        if self.docs_per_client > self.total_corpus_docs:
            self.corpus_reader = self.create_bulk_corpus_reader(
                self.corpus,
                self.lines_per_bulk,
                self._processor,
                num_clients=1,
                client_index=0,
            )
        else:
            # each client gets a section of the files
            self.corpus_reader = self.create_bulk_corpus_reader(
                self.corpus,
                self.lines_per_bulk,
                self._processor,
                num_clients,
                client_index,
            )
        params = self._params.copy()
        with self.corpus_reader:
            while not self._complete:
                num_docs, lines, raw_size_in_bytes = next(self.corpus_reader)
                if self.current_docs + num_docs >= self.docs_per_client:
                    self._complete = True
                    self.logger.info("Completed with [%s] docs", self.docs_per_client)
                    # we deliberately don't trim the last batch to get the exact number of documents. This would break
                    # encapsulation and this shouldn't be an issue on larger datasets
                elif self.current_docs > 0 and self.current_docs % report_size == 0:
                    self.logger.debug("[%s] docs indexed", self.current_docs)
                self.current_docs += num_docs
                params["body"] = lines
                params["unit"] = "docs"
                params["action-metadata-present"] = True
                params["bulk-size"] = num_docs
                self.event_time_span = (self.max_timestamp - self.min_timestamp).total_seconds()
                relative_time = int(time.perf_counter()) - self.start_time
                params["param-source-stats"] = {
                    "client": client_index,
                    "raw-size-bytes": raw_size_in_bytes,
                    "event-time-span": self.event_time_span,
                    "relative-time": relative_time,
                    "index-lag": self.event_time_span - relative_time,
                    "min-timestamp": self.min_timestamp.isoformat(sep="T", timespec=self._time_format),
                    "max-timestamp": self.max_timestamp.isoformat(sep="T", timespec=self._time_format),
                }
                yield params