def create_corpus_reader()

in elastic/shared/track_processors/data_generator.py [0:0]


    def create_corpus_reader(self, corpus, num_clients, client_index, bulk_size, processor):
        readers = []
        for docs in corpus.documents:
            # Give each client a designated chunk of the file to use. Note: this will result in out of order delivery.
            # Maybe each client could consume each Nth line, where N = client num.
            # This will likely result in more seeking and less performance.
            offset, num_docs = bounds(docs.number_of_documents, client_index, num_clients)
            if num_docs > 0:
                self.logger.info(
                    "Generator [%d] will read [%d] docs starting from line offset [%d] for [%s] from corpus [%s].",
                    client_index,
                    num_docs,
                    offset,
                    docs.target_data_stream,
                    corpus.name,
                )
                source = WrappingSlice(io.MmapSource, offset, num_docs)
                readers.append(
                    JsonFileReader(
                        os.path.join(self.track_data_root, corpus.name, docs.document_file),
                        source,
                        processor,
                        docs.target_data_stream,
                        corpus.name,
                    )
                )
            else:
                self.logger.info(
                    "Generator [%d] skips [%s] (no documents to read).",
                    client_index,
                    corpus.name,
                )
        return CorpusReader(readers, bulk_size)