in esrally/track/loader.py [0:0]
def _create_corpora(self, corpora_specs, indices, data_streams):
if len(indices) > 0 and len(data_streams) > 0:
raise TrackSyntaxError("indices and data-streams cannot both be specified")
document_corpora = []
known_corpora_names = set()
for corpus_spec in corpora_specs:
name = self._r(corpus_spec, "name")
if name in known_corpora_names:
self._error("Duplicate document corpus name [%s]." % name)
known_corpora_names.add(name)
meta_data = self._r(corpus_spec, "meta", error_ctx=name, mandatory=False)
corpus = track.DocumentCorpus(name=name, meta_data=meta_data)
# defaults on corpus level
default_base_url = self._r(corpus_spec, "base-url", mandatory=False, default_value=None)
default_source_format = self._r(corpus_spec, "source-format", mandatory=False, default_value=track.Documents.SOURCE_FORMAT_BULK)
default_action_and_meta_data = self._r(corpus_spec, "includes-action-and-meta-data", mandatory=False, default_value=False)
corpus_target_idx = None
corpus_target_ds = None
corpus_target_type = None
if len(indices) == 1:
corpus_target_idx = self._r(corpus_spec, "target-index", mandatory=False, default_value=indices[0].name)
elif len(indices) > 0:
corpus_target_idx = self._r(corpus_spec, "target-index", mandatory=False)
if len(data_streams) == 1:
corpus_target_ds = self._r(corpus_spec, "target-data-stream", mandatory=False, default_value=data_streams[0].name)
elif len(data_streams) > 0:
corpus_target_ds = self._r(corpus_spec, "target-data-stream", mandatory=False)
if len(indices) == 1 and len(indices[0].types) == 1:
corpus_target_type = self._r(corpus_spec, "target-type", mandatory=False, default_value=indices[0].types[0])
elif len(indices) > 0:
corpus_target_type = self._r(corpus_spec, "target-type", mandatory=False)
for doc_spec in self._r(corpus_spec, "documents"):
base_url = self._r(doc_spec, "base-url", mandatory=False, default_value=default_base_url)
source_format = self._r(doc_spec, "source-format", mandatory=False, default_value=default_source_format)
if source_format == track.Documents.SOURCE_FORMAT_BULK:
docs = self._r(doc_spec, "source-file")
if io.is_archive(docs):
document_archive = docs
document_file = io.splitext(docs)[0]
else:
document_archive = None
document_file = docs
num_docs = self._r(doc_spec, "document-count")
compressed_bytes = self._r(doc_spec, "compressed-bytes", mandatory=False)
uncompressed_bytes = self._r(doc_spec, "uncompressed-bytes", mandatory=False)
doc_meta_data = self._r(doc_spec, "meta", error_ctx=name, mandatory=False)
includes_action_and_meta_data = self._r(
doc_spec, "includes-action-and-meta-data", mandatory=False, default_value=default_action_and_meta_data
)
if includes_action_and_meta_data:
target_idx = None
target_type = None
target_ds = None
else:
target_type = self._r(doc_spec, "target-type", mandatory=False, default_value=corpus_target_type, error_ctx=docs)
# require to be specified if we're using data streams and we have no default
target_ds = self._r(
doc_spec,
"target-data-stream",
mandatory=len(data_streams) > 0 and corpus_target_ds is None,
default_value=corpus_target_ds,
error_ctx=docs,
)
if target_ds and len(indices) > 0:
# if indices are in use we error
raise TrackSyntaxError("target-data-stream cannot be used when using indices")
if target_ds and target_type:
raise TrackSyntaxError("target-type cannot be used when using data-streams")
# need an index if we're using indices and no meta-data are present and we don't have a default
target_idx = self._r(
doc_spec,
"target-index",
mandatory=len(indices) > 0 and corpus_target_idx is None,
default_value=corpus_target_idx,
error_ctx=docs,
)
# either target_idx or target_ds
if target_idx and len(data_streams) > 0:
# if data streams are in use we error
raise TrackSyntaxError("target-index cannot be used when using data-streams")
# we need one or the other
if target_idx is None and target_ds is None:
raise TrackSyntaxError(
f"a {'target-index' if len(indices) > 0 else 'target-data-stream'} is required for {docs}"
)
docs = track.Documents(
source_format=source_format,
document_file=document_file,
document_archive=document_archive,
base_url=base_url,
includes_action_and_meta_data=includes_action_and_meta_data,
number_of_documents=num_docs,
compressed_size_in_bytes=compressed_bytes,
uncompressed_size_in_bytes=uncompressed_bytes,
target_index=target_idx,
target_type=target_type,
target_data_stream=target_ds,
meta_data=doc_meta_data,
)
corpus.documents.append(docs)
else:
self._error("Unknown source-format [%s] in document corpus [%s]." % (source_format, name))
document_corpora.append(corpus)
return document_corpora