def create_bulk_corpus_reader()

in elastic/shared/parameter_sources/processed.py [0:0]


    def create_bulk_corpus_reader(self, corpus, bulk_size, processor, num_clients, client_index):
        readers = []
        for docs in corpus.documents:
            # we want even offsets or docs and meta will be separated
            self.logger.debug(
                "Task-relative clients at index [%d-%d] using [%s] which has [%d] documents from corpus " "name [%s]",
                client_index,
                num_clients,
                docs.document_file,
                docs.number_of_documents,
                corpus.name,
            )
            offset, num_docs = bounds(docs.number_of_documents, client_index, num_clients, ensure_even=True)
            if num_docs > 0:
                self.logger.debug(
                    "Task-relative clients at index [%d-%d] will read [%d] docs starting from line offset "
                    "[%d] for [%s] from corpus [%s] at file[%s].",
                    client_index,
                    num_clients,
                    num_docs,
                    offset,
                    docs.target_data_stream,
                    corpus.name,
                    docs.document_file,
                )
                # multiple offset and num docs by 2 to account for meta lines
                source = WrappingSlice(io.MmapSource, offset * 2, num_docs * 2)
                readers.append(BulkFileReader(docs.document_file, source, processor, corpus.name))
            else:
                self.logger.info(
                    "Task-relative clients at index [%d-%d] skip [%s] (no documents to read).",
                    client_index,
                    num_clients,
                    corpus.name,
                )
        return CorpusReader(readers, bulk_size)