in esrally/track/params.py [0:0]
def __init__(self, track, params, **kwargs):
super().__init__(track, params, **kwargs)
id_conflicts = params.get("conflicts", None)
if not id_conflicts:
self.id_conflicts = IndexIdConflict.NoConflicts
elif id_conflicts == "sequential":
self.id_conflicts = IndexIdConflict.SequentialConflicts
elif id_conflicts == "random":
self.id_conflicts = IndexIdConflict.RandomConflicts
else:
raise exceptions.InvalidSyntax("Unknown 'conflicts' setting [%s]" % id_conflicts)
if "data-streams" in params and self.id_conflicts != IndexIdConflict.NoConflicts:
raise exceptions.InvalidSyntax("'conflicts' cannot be used with 'data-streams'")
if self.id_conflicts != IndexIdConflict.NoConflicts:
self.conflict_probability = self.float_param(
params, name="conflict-probability", default_value=25, min_value=0, max_value=100, min_operator=operator.lt
)
self.on_conflict = params.get("on-conflict", "index")
if self.on_conflict not in ["index", "update"]:
raise exceptions.InvalidSyntax(f"Unknown 'on-conflict' setting [{self.on_conflict}]")
self.recency = self.float_param(params, name="recency", default_value=0, min_value=0, max_value=1, min_operator=operator.lt)
else:
self.conflict_probability = None
self.on_conflict = None
self.recency = None
self.corpora = self.used_corpora(track, params)
if len(self.corpora) == 0:
raise exceptions.InvalidSyntax(
f"There is no document corpus definition for track {track}. You must add at "
f"least one before making bulk requests to Elasticsearch."
)
for corpus in self.corpora:
for document_set in corpus.documents:
if document_set.includes_action_and_meta_data and self.id_conflicts != IndexIdConflict.NoConflicts:
file_name = document_set.document_archive if document_set.has_compressed_corpus() else document_set.document_file
raise exceptions.InvalidSyntax(
"Cannot generate id conflicts [%s] as [%s] in document corpus [%s] already contains an "
"action and meta-data line." % (id_conflicts, file_name, corpus)
)
self.pipeline = params.get("pipeline", None)
try:
self.bulk_size = int(params["bulk-size"])
if self.bulk_size <= 0:
raise exceptions.InvalidSyntax("'bulk-size' must be positive but was %d" % self.bulk_size)
except KeyError:
raise exceptions.InvalidSyntax("Mandatory parameter 'bulk-size' is missing")
except ValueError:
raise exceptions.InvalidSyntax("'bulk-size' must be numeric")
try:
self.batch_size = int(params.get("batch-size", self.bulk_size))
if self.batch_size <= 0:
raise exceptions.InvalidSyntax("'batch-size' must be positive but was %d" % self.batch_size)
if self.batch_size < self.bulk_size:
raise exceptions.InvalidSyntax("'batch-size' must be greater than or equal to 'bulk-size'")
if self.batch_size % self.bulk_size != 0:
raise exceptions.InvalidSyntax("'batch-size' must be a multiple of 'bulk-size'")
except ValueError:
raise exceptions.InvalidSyntax("'batch-size' must be numeric")
self.ingest_percentage = self.float_param(params, name="ingest-percentage", default_value=100, min_value=0, max_value=100)
self.refresh = params.get("refresh")
self.looped = params.get("looped", False)
self.param_source = PartitionBulkIndexParamSource(
self.corpora,
self.batch_size,
self.bulk_size,
self.ingest_percentage,
self.id_conflicts,
self.conflict_probability,
self.on_conflict,
self.recency,
self.pipeline,
self.refresh,
self.looped,
self._params,
)