def _create_corpora()

in esrally/track/loader.py [0:0]


    def _create_corpora(self, corpora_specs, indices, data_streams):
        if len(indices) > 0 and len(data_streams) > 0:
            raise TrackSyntaxError("indices and data-streams cannot both be specified")
        document_corpora = []
        known_corpora_names = set()
        for corpus_spec in corpora_specs:
            name = self._r(corpus_spec, "name")

            if name in known_corpora_names:
                self._error("Duplicate document corpus name [%s]." % name)
            known_corpora_names.add(name)

            meta_data = self._r(corpus_spec, "meta", error_ctx=name, mandatory=False)
            corpus = track.DocumentCorpus(name=name, meta_data=meta_data)
            # defaults on corpus level
            default_base_url = self._r(corpus_spec, "base-url", mandatory=False, default_value=None)
            default_source_format = self._r(corpus_spec, "source-format", mandatory=False, default_value=track.Documents.SOURCE_FORMAT_BULK)
            default_action_and_meta_data = self._r(corpus_spec, "includes-action-and-meta-data", mandatory=False, default_value=False)
            corpus_target_idx = None
            corpus_target_ds = None
            corpus_target_type = None

            if len(indices) == 1:
                corpus_target_idx = self._r(corpus_spec, "target-index", mandatory=False, default_value=indices[0].name)
            elif len(indices) > 0:
                corpus_target_idx = self._r(corpus_spec, "target-index", mandatory=False)

            if len(data_streams) == 1:
                corpus_target_ds = self._r(corpus_spec, "target-data-stream", mandatory=False, default_value=data_streams[0].name)
            elif len(data_streams) > 0:
                corpus_target_ds = self._r(corpus_spec, "target-data-stream", mandatory=False)

            if len(indices) == 1 and len(indices[0].types) == 1:
                corpus_target_type = self._r(corpus_spec, "target-type", mandatory=False, default_value=indices[0].types[0])
            elif len(indices) > 0:
                corpus_target_type = self._r(corpus_spec, "target-type", mandatory=False)

            for doc_spec in self._r(corpus_spec, "documents"):
                base_url = self._r(doc_spec, "base-url", mandatory=False, default_value=default_base_url)
                source_format = self._r(doc_spec, "source-format", mandatory=False, default_value=default_source_format)

                if source_format == track.Documents.SOURCE_FORMAT_BULK:
                    docs = self._r(doc_spec, "source-file")
                    if io.is_archive(docs):
                        document_archive = docs
                        document_file = io.splitext(docs)[0]
                    else:
                        document_archive = None
                        document_file = docs
                    num_docs = self._r(doc_spec, "document-count")
                    compressed_bytes = self._r(doc_spec, "compressed-bytes", mandatory=False)
                    uncompressed_bytes = self._r(doc_spec, "uncompressed-bytes", mandatory=False)
                    doc_meta_data = self._r(doc_spec, "meta", error_ctx=name, mandatory=False)

                    includes_action_and_meta_data = self._r(
                        doc_spec, "includes-action-and-meta-data", mandatory=False, default_value=default_action_and_meta_data
                    )
                    if includes_action_and_meta_data:
                        target_idx = None
                        target_type = None
                        target_ds = None
                    else:
                        target_type = self._r(doc_spec, "target-type", mandatory=False, default_value=corpus_target_type, error_ctx=docs)

                        # require to be specified if we're using data streams and we have no default
                        target_ds = self._r(
                            doc_spec,
                            "target-data-stream",
                            mandatory=len(data_streams) > 0 and corpus_target_ds is None,
                            default_value=corpus_target_ds,
                            error_ctx=docs,
                        )
                        if target_ds and len(indices) > 0:
                            # if indices are in use we error
                            raise TrackSyntaxError("target-data-stream cannot be used when using indices")

                        if target_ds and target_type:
                            raise TrackSyntaxError("target-type cannot be used when using data-streams")

                        # need an index if we're using indices and no meta-data are present and we don't have a default
                        target_idx = self._r(
                            doc_spec,
                            "target-index",
                            mandatory=len(indices) > 0 and corpus_target_idx is None,
                            default_value=corpus_target_idx,
                            error_ctx=docs,
                        )
                        # either target_idx or target_ds
                        if target_idx and len(data_streams) > 0:
                            # if data streams are in use we error
                            raise TrackSyntaxError("target-index cannot be used when using data-streams")

                        # we need one or the other
                        if target_idx is None and target_ds is None:
                            raise TrackSyntaxError(
                                f"a {'target-index' if len(indices) > 0 else 'target-data-stream'} is required for {docs}"
                            )

                    docs = track.Documents(
                        source_format=source_format,
                        document_file=document_file,
                        document_archive=document_archive,
                        base_url=base_url,
                        includes_action_and_meta_data=includes_action_and_meta_data,
                        number_of_documents=num_docs,
                        compressed_size_in_bytes=compressed_bytes,
                        uncompressed_size_in_bytes=uncompressed_bytes,
                        target_index=target_idx,
                        target_type=target_type,
                        target_data_stream=target_ds,
                        meta_data=doc_meta_data,
                    )
                    corpus.documents.append(docs)
                else:
                    self._error("Unknown source-format [%s] in document corpus [%s]." % (source_format, name))
            document_corpora.append(corpus)
        return document_corpora