esrally/track/track.py (883 lines of code) (raw):
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import collections
import numbers
import re
from enum import Enum, auto, unique
from esrally import exceptions
from esrally.utils import serverless
class Index:
"""
Defines an index in Elasticsearch.
"""
def __init__(self, name, body=None, types=None):
"""
Creates a new index.
:param name: The index name. Mandatory.
:param body: A dict representation of the index body. Optional.
:param types: A list of types. Should contain at least one type.
"""
if types is None:
types = []
if body is None:
body = {}
self.name = name
self.body = body
self.types = types
def matches(self, pattern):
if pattern is None:
return True
elif pattern in ["_all", "*"]:
return True
elif self.name == pattern:
return True
else:
return False
def __str__(self):
return self.name
def __repr__(self):
r = []
for prop, value in vars(self).items():
r.append("%s = [%s]" % (prop, repr(value)))
return ", ".join(r)
def __hash__(self):
return hash(self.name)
def __eq__(self, other):
return self.name == other.name
class DataStream:
"""
Defines a data stream in Elasticsearch.
"""
def __init__(self, name):
"""
Creates a new data stream.
:param name: The data stream name. Mandatory.
"""
self.name = name
def matches(self, pattern):
if pattern is None:
return True
elif pattern in ["_all", "*"]:
return True
elif self.name == pattern:
return True
else:
return False
def __str__(self):
return self.name
def __repr__(self):
r = []
for prop, value in vars(self).items():
r.append("%s = [%s]" % (prop, repr(value)))
return ", ".join(r)
def __hash__(self):
return hash(self.name)
def __eq__(self, other):
return self.name == other.name
class IndexTemplate:
"""
Defines an index template in Elasticsearch.
"""
def __init__(self, name, pattern, content, delete_matching_indices=False):
"""
Creates a new index template.
:param name: Name of the index template. Mandatory.
:param pattern: The index pattern to which the index template applies. Mandatory.
:param content: The content of the corresponding template. Mandatory.
:param delete_matching_indices: Delete all indices that match the pattern before the benchmark iff True.
"""
self.name = name
self.pattern = pattern
self.content = content
self.delete_matching_indices = delete_matching_indices
def __str__(self, *args, **kwargs):
return self.name
def __repr__(self):
r = []
for prop, value in vars(self).items():
r.append("%s = [%s]" % (prop, repr(value)))
return ", ".join(r)
def __hash__(self):
return hash(self.name)
def __eq__(self, other):
return self.name == other.name
class ComponentTemplate:
"""
Defines a component template in Elasticsearch.
"""
def __init__(self, name, content):
"""
Creates a new index template.
:param name: Name of the index template. Mandatory.
:param content: The content of the corresponding template. Mandatory.
"""
self.name = name
self.content = content
def __str__(self, *args, **kwargs):
return self.name
def __repr__(self):
r = []
for prop, value in vars(self).items():
r.append("%s = [%s]" % (prop, repr(value)))
return ", ".join(r)
def __hash__(self):
return hash(self.name)
def __eq__(self, other):
return self.name == other.name
class Documents:
SOURCE_FORMAT_BULK = "bulk"
def __init__(
self,
source_format,
document_file=None,
document_archive=None,
base_url=None,
includes_action_and_meta_data=False,
number_of_documents=0,
compressed_size_in_bytes=0,
uncompressed_size_in_bytes=0,
target_index=None,
target_data_stream=None,
target_type=None,
meta_data=None,
):
"""
:param source_format: The format of these documents. Mandatory.
:param document_file: The file name of benchmark documents after decompression. Optional (e.g. for percolation we
just need a mapping but no documents)
:param document_archive: The file name of the compressed benchmark document name on the remote server. Optional (e.g. for
percolation we just need a mapping but no documents)
:param base_url: The URL from which to load data if they are not available locally. Optional.
:param includes_action_and_meta_data: True, if the source file already includes the action and meta-data line. False, if it only
contains documents.
:param number_of_documents: The number of documents in the benchmark document. Needed for proper progress reporting. Only needed if
a document_archive is given.
:param compressed_size_in_bytes: The compressed size in bytes of the benchmark document. Needed for verification of the download and
user reporting. Only useful if a document_archive is given (optional but recommended to be set).
:param uncompressed_size_in_bytes: The size in bytes of the benchmark document after decompressing it.
Only useful if a document_archive is given (optional but recommended to be set).
:param target_index: The index to target for bulk operations. May be ``None`` if ``includes_action_and_meta_data`` is ``False``.
:param target_data_stream: The data stream to target for bulk operations.
Maybe be ``None`` if ``includes_action_and_meta_data`` is ``False``.
:param target_type: The document type to target for bulk operations. May be ``None`` if ``includes_action_and_meta_data``
is ``False``.
:param meta_data: A dict containing key-value pairs with additional meta-data describing documents. Optional.
"""
self.source_format = source_format
self.document_file = document_file
self.document_archive = document_archive
self.base_url = base_url
self.includes_action_and_meta_data = includes_action_and_meta_data
self._number_of_documents = number_of_documents
self._compressed_size_in_bytes = compressed_size_in_bytes
self._uncompressed_size_in_bytes = uncompressed_size_in_bytes
self.target_index = target_index
self.target_data_stream = target_data_stream
self.target_type = target_type
self.meta_data = meta_data or {}
def has_compressed_corpus(self):
return self.document_archive is not None
def has_uncompressed_corpus(self):
return self.document_file is not None
@property
def number_of_documents(self):
return self._number_of_documents
@number_of_documents.setter
def number_of_documents(self, value):
self._number_of_documents = value
@property
def uncompressed_size_in_bytes(self):
return self._uncompressed_size_in_bytes
@uncompressed_size_in_bytes.setter
def uncompressed_size_in_bytes(self, value):
self._uncompressed_size_in_bytes = value
@property
def compressed_size_in_bytes(self):
return self._compressed_size_in_bytes
@compressed_size_in_bytes.setter
def compressed_size_in_bytes(self, value):
self._compressed_size_in_bytes = value
@property
def number_of_lines(self):
if self.includes_action_and_meta_data:
return self.number_of_documents * 2
else:
return self.number_of_documents
@property
def is_bulk(self):
return self.source_format == Documents.SOURCE_FORMAT_BULK
def __str__(self):
return "%s documents from %s" % (self.source_format, self.document_file)
def __repr__(self):
r = []
for prop, value in vars(self).items():
r.append("%s = [%s]" % (prop, repr(value)))
return ", ".join(r)
def __hash__(self):
return (
hash(self.source_format)
^ hash(self.document_file)
^ hash(self.document_archive)
^ hash(self.base_url)
^ hash(self.includes_action_and_meta_data)
^ hash(self.number_of_documents)
^ hash(self.compressed_size_in_bytes)
^ hash(self.uncompressed_size_in_bytes)
^ hash(self.target_index)
^ hash(self.target_data_stream)
^ hash(self.target_type)
^ hash(frozenset(self.meta_data.items()))
)
def __eq__(self, othr):
return isinstance(othr, type(self)) and (
self.source_format,
self.document_file,
self.document_archive,
self.base_url,
self.includes_action_and_meta_data,
self.number_of_documents,
self.compressed_size_in_bytes,
self.uncompressed_size_in_bytes,
self.target_type,
self.target_data_stream,
self.target_type,
self.meta_data,
) == (
othr.source_format,
othr.document_file,
othr.document_archive,
othr.base_url,
othr.includes_action_and_meta_data,
othr.number_of_documents,
othr.compressed_size_in_bytes,
othr.uncompressed_size_in_bytes,
othr.target_type,
othr.target_data_stream,
othr.target_type,
othr.meta_data,
)
class DocumentCorpus:
def __init__(self, name, documents=None, meta_data=None):
"""
:param name: The name of this document corpus. Mandatory.
:param documents: A list of ``Documents`` instances that belong to this corpus.
:param meta_data: A dict containing key-value pairs with additional meta-data describing this corpus. Optional.
"""
self.name = name
self.documents = documents or []
self.meta_data = meta_data or {}
def number_of_documents(self, source_format):
num = 0
for doc in self.documents:
if doc.source_format == source_format:
num += doc.number_of_documents
return num
def compressed_size_in_bytes(self, source_format):
num = 0
for doc in self.documents:
if doc.source_format == source_format and doc.compressed_size_in_bytes is not None:
num += doc.compressed_size_in_bytes
else:
return None
return num
def uncompressed_size_in_bytes(self, source_format):
num = 0
for doc in self.documents:
if doc.source_format == source_format and doc.uncompressed_size_in_bytes is not None:
num += doc.uncompressed_size_in_bytes
else:
return None
return num
def filter(self, source_format=None, target_indices=None, target_data_streams=None):
filtered = []
for d in self.documents:
# skip if source format or target index does not match
if source_format and d.source_format != source_format:
continue
if target_indices and d.target_index not in target_indices:
continue
if target_data_streams and d.target_data_stream not in target_data_streams:
continue
filtered.append(d)
return DocumentCorpus(self.name, filtered, meta_data=dict(self.meta_data))
def union(self, other):
"""
Creates a new corpus based on the current and the provided other corpus. This is not meant as a generic union
of two arbitrary corpora but rather to unify the documents referenced by two instances of the same corpus. This
is useful when two tasks reference different subsets of a corpus and a unified view (e.g. for downloading the
appropriate document files) is required.
:param other: The other corpus to unify with this one. Must have the same name and meta-data.
:return: A document corpus instance with the same and meta-data but with documents from both corpora.
"""
if self.name != other.name:
raise exceptions.RallyAssertionError(f"Corpora names differ: [{self.name}] and [{other.name}].")
if self.meta_data != other.meta_data:
raise exceptions.RallyAssertionError(f"Corpora meta-data differ: [{self.meta_data}] and [{other.meta_data}].")
if self is other:
return self
else:
return DocumentCorpus(
name=self.name, documents=list(set(self.documents).union(other.documents)), meta_data=dict(self.meta_data)
)
def __str__(self):
return self.name
def __repr__(self):
r = []
for prop, value in vars(self).items():
r.append("%s = [%s]" % (prop, repr(value)))
return ", ".join(r)
def __hash__(self):
return hash(self.name) ^ hash(self.documents) ^ hash(frozenset(self.meta_data.items()))
def __eq__(self, othr):
return isinstance(othr, type(self)) and (self.name, self.documents, self.meta_data) == (othr.name, othr.documents, othr.meta_data)
class Track:
"""
A track defines the data set that is used. It corresponds loosely to a use case (e.g. logging, event processing, analytics, ...)
"""
def __init__(
self,
name,
description=None,
meta_data=None,
challenges=None,
indices=None,
data_streams=None,
templates=None,
composable_templates=None,
component_templates=None,
corpora=None,
dependencies=None,
has_plugins=False,
root=None,
):
"""
Creates a new track.
:param name: A short, descriptive name for this track. As per convention, this name should be in lower-case without spaces.
:param description: A description for this track (should be less than 80 characters).
:param meta_data: An optional dict of meta-data elements to attach to each metrics record. Default: {}.
:param challenges: A list of one or more challenges to use. Precondition: If the list is non-empty it contains exactly one element
with its ``default`` property set to ``True``.
:param indices: A list of indices for this track. May be None.
:param data_streams: A list of data streams for this track. May be None.
:param templates: A list of index templates for this track. May be None.
:param corpora: A list of document corpus definitions for this track. May be None.
:param has_plugins: True iff the track also defines plugins (e.g. custom runners or parameter sources).
:param root: The absolute path to the directory containing the track file(s).
"""
self.name = name
self.meta_data = meta_data if meta_data else {}
self.description = description if description is not None else ""
self.challenges = challenges if challenges else []
self.indices = indices if indices else []
self.data_streams = data_streams if data_streams else []
self.corpora = corpora if corpora else []
self.templates = templates if templates else []
self.composable_templates = composable_templates if composable_templates else []
self.component_templates = component_templates if component_templates else []
self.dependencies = dependencies if dependencies else []
self.has_plugins = has_plugins
self.root = root
@property
def default_challenge(self):
for challenge in self.challenges:
if challenge.default:
return challenge
# This should only happen if we don't have any challenges
return None
@property
def selected_challenge(self):
for challenge in self.challenges:
if challenge.selected:
return challenge
return None
@property
def selected_challenge_or_default(self):
selected = self.selected_challenge
return selected if selected else self.default_challenge
def find_challenge_or_default(self, name):
"""
:param name: The name of the challenge to find.
:return: The challenge with the given name. The default challenge, if the name is "" or ``None``.
"""
if name in [None, ""]:
return self.default_challenge
else:
return self.find_challenge(name)
def find_challenge(self, name):
for challenge in self.challenges:
if challenge.name == name:
return challenge
raise exceptions.InvalidName("Unknown challenge [%s] for track [%s]" % (name, self.name))
@property
def number_of_documents(self):
num_docs = 0
for corpus in self.corpora:
# TODO #341: Improve API to let users define what they want (everything, just specific types, ...)
num_docs += corpus.number_of_documents(Documents.SOURCE_FORMAT_BULK)
return num_docs
@property
def compressed_size_in_bytes(self):
size = 0
for corpus in self.corpora:
# TODO #341: Improve API to let users define what they want (everything, just specific types, ...)
curr_size = corpus.compressed_size_in_bytes(Documents.SOURCE_FORMAT_BULK)
if curr_size is not None:
size += curr_size
else:
return None
return size
@property
def uncompressed_size_in_bytes(self):
size = 0
for corpus in self.corpora:
# TODO #341: Improve API to let users define what they want (everything, just specific types, ...)
curr_size = corpus.uncompressed_size_in_bytes(Documents.SOURCE_FORMAT_BULK)
if curr_size is not None:
size += curr_size
else:
return None
return size
def index_names(self):
return [i.name for i in self.indices]
def data_stream_names(self):
return [i.name for i in self.data_streams]
def __str__(self):
return self.name
def __repr__(self):
r = []
for prop, value in vars(self).items():
r.append("%s = [%s]" % (prop, repr(value)))
return ", ".join(r)
def __hash__(self):
return (
hash(self.name)
^ hash(self.meta_data)
^ hash(self.description)
^ hash(self.challenges)
^ hash(self.indices)
^ hash(self.templates)
^ hash(self.composable_templates)
^ hash(self.component_templates)
^ hash(self.corpora)
)
def __eq__(self, othr):
return isinstance(othr, type(self)) and (
self.name,
self.meta_data,
self.description,
self.challenges,
self.indices,
self.data_streams,
self.templates,
self.composable_templates,
self.component_templates,
self.corpora,
) == (
othr.name,
othr.meta_data,
othr.description,
othr.challenges,
othr.indices,
othr.data_streams,
othr.templates,
othr.composable_templates,
othr.component_templates,
othr.corpora,
)
class Challenge:
"""
A challenge defines the concrete operations that will be done.
"""
def __init__(
self,
name,
description=None,
user_info=None,
default=False,
selected=False,
auto_generated=False,
parameters=None,
meta_data=None,
schedule=None,
):
self.name = name
self.parameters = parameters if parameters else {}
self.meta_data = meta_data if meta_data else {}
self.description = description
self.user_info = user_info
self.serverless_info = []
self.default = default
self.selected = selected
self.auto_generated = auto_generated
self.schedule = schedule if schedule else []
def prepend_tasks(self, tasks):
self.schedule = tasks + self.schedule
def remove_task(self, task):
self.schedule.remove(task)
def __str__(self):
return self.name
def __repr__(self):
r = []
for prop, value in vars(self).items():
r.append("%s = [%s]" % (prop, repr(value)))
return ", ".join(r)
def __hash__(self):
return (
hash(self.name)
^ hash(self.description)
^ hash(self.default)
^ hash(self.selected)
^ hash(self.auto_generated)
^ hash(self.parameters)
^ hash(self.meta_data)
^ hash(self.schedule)
)
def __eq__(self, othr):
return isinstance(othr, type(self)) and (
self.name,
self.description,
self.default,
self.selected,
self.auto_generated,
self.parameters,
self.meta_data,
self.schedule,
) == (
othr.name,
othr.description,
othr.default,
othr.selected,
othr.auto_generated,
othr.parameters,
othr.meta_data,
othr.schedule,
)
@unique
class AdminStatus(Enum):
# We can't use True/False as they are keywords
Yes = auto()
No = auto()
@unique
class OperationType(Enum):
# TODO replace manual counts with auto() when we drop support for Python 3.10
# https://docs.python.org/3/library/enum.html#enum.auto
IndexStats = (1, AdminStatus.No, serverless.Status.Internal)
NodeStats = (2, AdminStatus.No, serverless.Status.Internal)
Search = (3, AdminStatus.No, serverless.Status.Public)
Bulk = (4, AdminStatus.No, serverless.Status.Public)
# Public as we can't verify the actual status
RawRequest = (5, AdminStatus.No, serverless.Status.Public)
WaitForRecovery = (6, AdminStatus.No, serverless.Status.Internal)
WaitForSnapshotCreate = (7, AdminStatus.No, serverless.Status.Internal)
# Public as all supported operation types are Public too (including RawRequest as
# mentioned above)
Composite = (8, AdminStatus.No, serverless.Status.Public)
SubmitAsyncSearch = (9, AdminStatus.No, serverless.Status.Public)
GetAsyncSearch = (10, AdminStatus.No, serverless.Status.Public)
DeleteAsyncSearch = (11, AdminStatus.No, serverless.Status.Public)
PaginatedSearch = (12, AdminStatus.No, serverless.Status.Public)
ScrollSearch = (13, AdminStatus.No, serverless.Status.Public)
OpenPointInTime = (14, AdminStatus.No, serverless.Status.Public)
ClosePointInTime = (15, AdminStatus.No, serverless.Status.Public)
Sql = (16, AdminStatus.No, serverless.Status.Public)
FieldCaps = (17, AdminStatus.No, serverless.Status.Public)
CompositeAgg = (18, AdminStatus.No, serverless.Status.Public)
WaitForCurrentSnapshotsCreate = (19, AdminStatus.No, serverless.Status.Internal)
Downsample = (20, AdminStatus.No, serverless.Status.Internal)
Esql = (21, AdminStatus.No, serverless.Status.Public)
# administrative actions
ForceMerge = (22, AdminStatus.Yes, serverless.Status.Internal)
ClusterHealth = (23, AdminStatus.Yes, serverless.Status.Internal)
PutPipeline = (24, AdminStatus.Yes, serverless.Status.Public)
Refresh = (25, AdminStatus.Yes, serverless.Status.Public)
CreateIndex = (26, AdminStatus.Yes, serverless.Status.Public)
DeleteIndex = (27, AdminStatus.Yes, serverless.Status.Public)
CreateIndexTemplate = (28, AdminStatus.Yes, serverless.Status.Blocked)
DeleteIndexTemplate = (29, AdminStatus.Yes, serverless.Status.Blocked)
ShrinkIndex = (30, AdminStatus.Yes, serverless.Status.Blocked)
CreateMlDatafeed = (31, AdminStatus.Yes, serverless.Status.Public)
DeleteMlDatafeed = (32, AdminStatus.Yes, serverless.Status.Public)
StartMlDatafeed = (33, AdminStatus.Yes, serverless.Status.Public)
StopMlDatafeed = (34, AdminStatus.Yes, serverless.Status.Public)
CreateMlJob = (35, AdminStatus.Yes, serverless.Status.Public)
DeleteMlJob = (36, AdminStatus.Yes, serverless.Status.Public)
OpenMlJob = (37, AdminStatus.Yes, serverless.Status.Public)
CloseMlJob = (38, AdminStatus.Yes, serverless.Status.Public)
Sleep = (39, AdminStatus.Yes, serverless.Status.Public)
DeleteSnapshotRepository = (40, AdminStatus.Yes, serverless.Status.Internal)
CreateSnapshotRepository = (41, AdminStatus.Yes, serverless.Status.Internal)
CreateSnapshot = (42, AdminStatus.Yes, serverless.Status.Internal)
RestoreSnapshot = (43, AdminStatus.Yes, serverless.Status.Internal)
PutSettings = (44, AdminStatus.Yes, serverless.Status.Internal)
CreateTransform = (45, AdminStatus.Yes, serverless.Status.Public)
StartTransform = (46, AdminStatus.Yes, serverless.Status.Public)
WaitForTransform = (47, AdminStatus.Yes, serverless.Status.Public)
DeleteTransform = (48, AdminStatus.Yes, serverless.Status.Public)
CreateDataStream = (49, AdminStatus.Yes, serverless.Status.Public)
DeleteDataStream = (50, AdminStatus.Yes, serverless.Status.Public)
CreateComposableTemplate = (51, AdminStatus.Yes, serverless.Status.Public)
DeleteComposableTemplate = (52, AdminStatus.Yes, serverless.Status.Public)
CreateComponentTemplate = (53, AdminStatus.Yes, serverless.Status.Public)
DeleteComponentTemplate = (54, AdminStatus.Yes, serverless.Status.Public)
TransformStats = (55, AdminStatus.Yes, serverless.Status.Public)
CreateIlmPolicy = (56, AdminStatus.Yes, serverless.Status.Blocked)
DeleteIlmPolicy = (57, AdminStatus.Yes, serverless.Status.Blocked)
def __init__(self, id: int, admin_status: AdminStatus, serverless_status: serverless.Status):
self.id = id
self.admin_status = admin_status
self.serverless_status = serverless_status
@property
def admin_op(self):
return self.admin_status == AdminStatus.Yes
def to_hyphenated_string(self):
"""
Turns enum constants into hyphenated names, e.g. ``WaitForTransform`` becomes ``wait-for-transform``.
"""
return "".join(["-" + c.lower() if c.isupper() else c for c in self.name]).lstrip("-")
# pylint: disable=too-many-return-statements
@classmethod
def from_hyphenated_string(cls, v):
if v == "force-merge":
return OperationType.ForceMerge
elif v == "index-stats":
return OperationType.IndexStats
elif v == "node-stats":
return OperationType.NodeStats
elif v == "search":
return OperationType.Search
elif v == "scroll-search":
return OperationType.ScrollSearch
elif v == "paginated-search":
return OperationType.PaginatedSearch
elif v == "composite-agg":
return OperationType.CompositeAgg
elif v == "cluster-health":
return OperationType.ClusterHealth
elif v == "bulk":
return OperationType.Bulk
elif v == "raw-request":
return OperationType.RawRequest
elif v == "put-pipeline":
return OperationType.PutPipeline
elif v == "refresh":
return OperationType.Refresh
elif v == "create-index":
return OperationType.CreateIndex
elif v == "delete-index":
return OperationType.DeleteIndex
elif v == "create-index-template":
return OperationType.CreateIndexTemplate
elif v == "delete-index-template":
return OperationType.DeleteIndexTemplate
elif v == "create-composable-template":
return OperationType.CreateComposableTemplate
elif v == "delete-composable-template":
return OperationType.DeleteComposableTemplate
elif v == "create-component-template":
return OperationType.CreateComponentTemplate
elif v == "delete-component-template":
return OperationType.DeleteComponentTemplate
elif v == "shrink-index":
return OperationType.ShrinkIndex
elif v == "create-ml-datafeed":
return OperationType.CreateMlDatafeed
elif v == "delete-ml-datafeed":
return OperationType.DeleteMlDatafeed
elif v == "start-ml-datafeed":
return OperationType.StartMlDatafeed
elif v == "stop-ml-datafeed":
return OperationType.StopMlDatafeed
elif v == "create-ml-job":
return OperationType.CreateMlJob
elif v == "delete-ml-job":
return OperationType.DeleteMlJob
elif v == "open-ml-job":
return OperationType.OpenMlJob
elif v == "close-ml-job":
return OperationType.CloseMlJob
elif v == "sleep":
return OperationType.Sleep
elif v == "delete-snapshot-repository":
return OperationType.DeleteSnapshotRepository
elif v == "create-snapshot-repository":
return OperationType.CreateSnapshotRepository
elif v == "create-snapshot":
return OperationType.CreateSnapshot
elif v == "wait-for-snapshot-create":
return OperationType.WaitForSnapshotCreate
elif v == "wait-for-current-snapshots-create":
return OperationType.WaitForCurrentSnapshotsCreate
elif v == "restore-snapshot":
return OperationType.RestoreSnapshot
elif v == "wait-for-recovery":
return OperationType.WaitForRecovery
elif v == "put-settings":
return OperationType.PutSettings
elif v == "create-transform":
return OperationType.CreateTransform
elif v == "start-transform":
return OperationType.StartTransform
elif v == "wait-for-transform":
return OperationType.WaitForTransform
elif v == "delete-transform":
return OperationType.DeleteTransform
elif v == "transform-stats":
return OperationType.TransformStats
elif v == "create-data-stream":
return OperationType.CreateDataStream
elif v == "delete-data-stream":
return OperationType.DeleteDataStream
elif v == "composite":
return OperationType.Composite
elif v == "submit-async-search":
return OperationType.SubmitAsyncSearch
elif v == "get-async-search":
return OperationType.GetAsyncSearch
elif v == "delete-async-search":
return OperationType.DeleteAsyncSearch
elif v == "open-point-in-time":
return OperationType.OpenPointInTime
elif v == "close-point-in-time":
return OperationType.ClosePointInTime
elif v == "create-ilm-policy":
return OperationType.CreateIlmPolicy
elif v == "delete-ilm-policy":
return OperationType.DeleteIlmPolicy
elif v == "sql":
return OperationType.Sql
elif v == "field-caps":
return OperationType.FieldCaps
elif v == "downsample":
return OperationType.Downsample
elif v == "esql":
return OperationType.Esql
else:
raise KeyError(f"No enum value for [{v}]")
class TaskNameFilter:
def __init__(self, name):
self.name = name
def matches(self, task):
return self.name == task.name
def __hash__(self):
return hash(self.name)
def __eq__(self, other):
return isinstance(other, type(self)) and self.name == other.name
def __str__(self, *args, **kwargs):
return f"filter for task name [{self.name}]"
class TaskOpTypeFilter:
def __init__(self, op_type_name):
self.op_type = op_type_name
def matches(self, task):
return self.op_type == task.operation.type
def __hash__(self):
return hash(self.op_type)
def __eq__(self, other):
return isinstance(other, type(self)) and self.op_type == other.op_type
def __str__(self, *args, **kwargs):
return f"filter for operation type [{self.op_type}]"
class TaskTagFilter:
def __init__(self, tag_name):
self.tag_name = tag_name
def matches(self, task):
return self.tag_name in task.tags
def __hash__(self):
return hash(self.tag_name)
def __eq__(self, other):
return isinstance(other, type(self)) and self.tag_name == other.tag_name
def __str__(self, *args, **kwargs):
return f"filter for tasks tagged [{self.tag_name}]"
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super().__call__(*args, **kwargs)
return cls._instances[cls]
# Schedule elements
class Parallel:
def __init__(self, tasks, clients=None):
self.tasks = tasks
self._clients = clients
self.nested = True
@property
def clients(self):
if self._clients is not None:
return self._clients
else:
num_clients = 0
for task in self.tasks:
num_clients += task.clients
return num_clients
def matches(self, task_filter):
# a parallel element matches if any of its elements match
for task in self.tasks:
if task.matches(task_filter):
return True
return False
def remove_task(self, task):
self.tasks.remove(task)
def __iter__(self):
return iter(self.tasks)
def __str__(self, *args, **kwargs):
return "%d parallel tasks" % len(self.tasks)
def __repr__(self):
r = []
for prop, value in vars(self).items():
r.append("%s = [%s]" % (prop, repr(value)))
return ", ".join(r)
def __hash__(self):
return hash(self.tasks)
def __eq__(self, other):
return isinstance(other, type(self)) and self.tasks == other.tasks
Throughput = collections.namedtuple("Throughput", ["value", "unit"])
class Task:
THROUGHPUT_PATTERN = re.compile(r"(?P<value>(\d*\.)?\d+)\s(?P<unit>\w+/s)")
IGNORE_RESPONSE_ERROR_LEVEL_WHITELIST = ["non-fatal"]
def __init__(
self,
name,
operation,
tags=None,
meta_data=None,
warmup_iterations=None,
iterations=None,
warmup_time_period=None,
time_period=None,
ramp_up_time_period=None,
clients=1,
completes_parent=False,
any_completes_parent=False,
schedule=None,
params=None,
):
self.name = name
self.operation = operation
if isinstance(tags, str):
self.tags = [tags]
elif tags:
self.tags = tags
else:
self.tags = []
self.meta_data = meta_data if meta_data else {}
self.warmup_iterations = warmup_iterations
self.iterations = iterations
self.warmup_time_period = warmup_time_period
self.time_period = time_period
self.ramp_up_time_period = ramp_up_time_period
self.clients = clients
self.completes_parent = completes_parent
self.any_completes_parent = any_completes_parent
self.schedule = schedule
self.params = params if params else {}
self.nested = False
def matches(self, task_filter):
return task_filter.matches(self)
@property
def target_throughput(self):
def numeric(v):
# While booleans can be converted to a number (False -> 0, True -> 1), we don't want to allow that here
return isinstance(v, numbers.Number) and not isinstance(v, bool)
target_throughput = self.params.get("target-throughput")
target_interval = self.params.get("target-interval")
if target_interval is not None and target_throughput is not None:
raise exceptions.InvalidSyntax(
f"Task [{self}] specifies target-interval [{target_interval}] and "
f"target-throughput [{target_throughput}] but only one of them is allowed."
)
value = None
unit = "ops/s"
if target_interval:
if not numeric(target_interval):
raise exceptions.InvalidSyntax(f"Target interval [{target_interval}] for task [{self}] must be numeric.")
value = 1 / float(target_interval)
elif target_throughput:
if isinstance(target_throughput, str):
matches = re.match(Task.THROUGHPUT_PATTERN, target_throughput)
if matches:
value = float(matches.group("value"))
unit = matches.group("unit")
else:
raise exceptions.InvalidSyntax(f"Task [{self}] specifies invalid target throughput [{target_throughput}].")
elif numeric(target_throughput):
value = float(target_throughput)
else:
raise exceptions.InvalidSyntax(f"Target throughput [{target_throughput}] for task [{self}] must be string or numeric.")
if value:
return Throughput(value, unit)
else:
return None
@property
def ignore_response_error_level(self):
ignore_response_error_level = self.params.get("ignore-response-error-level")
if ignore_response_error_level and ignore_response_error_level not in Task.IGNORE_RESPONSE_ERROR_LEVEL_WHITELIST:
raise exceptions.InvalidSyntax(
f"Task [{self}] specifies ignore-response-error-level to [{ignore_response_error_level}] but "
f"the only allowed values are [{','.join(Task.IGNORE_RESPONSE_ERROR_LEVEL_WHITELIST)}]."
)
return ignore_response_error_level
def error_behavior(self, default_error_behavior):
"""
Returns the desired behavior when encountering errors during task execution.
:param default_error_behavior: (str) the default error behavior for the benchmark
:return: (str) prescribing error handling when a non-fatal error occurs:
"abort": will fail when any error gets encountered
"continue": will continue for non fatal errors
"""
behavior = "continue"
if default_error_behavior == "abort":
if self.ignore_response_error_level != "non-fatal":
behavior = "abort"
return behavior
def __hash__(self):
# Note that we do not include `params` in __hash__ and __eq__ (the other attributes suffice to uniquely define a task)
return (
hash(self.name)
^ hash(self.operation)
^ hash(self.warmup_iterations)
^ hash(self.iterations)
^ hash(self.warmup_time_period)
^ hash(self.time_period)
^ hash(self.ramp_up_time_period)
^ hash(self.clients)
^ hash(self.schedule)
^ hash(self.completes_parent)
^ hash(self.any_completes_parent)
)
def __eq__(self, other):
# Note that we do not include `params` in __hash__ and __eq__ (the other attributes suffice to uniquely define a task)
return isinstance(other, type(self)) and (
self.name,
self.operation,
self.warmup_iterations,
self.iterations,
self.warmup_time_period,
self.time_period,
self.ramp_up_time_period,
self.clients,
self.schedule,
self.completes_parent,
self.any_completes_parent,
) == (
other.name,
other.operation,
other.warmup_iterations,
other.iterations,
other.warmup_time_period,
other.time_period,
other.ramp_up_time_period,
other.clients,
other.schedule,
other.completes_parent,
other.any_completes_parent,
)
def __iter__(self):
return iter([self])
def __str__(self, *args, **kwargs):
return self.name
def __repr__(self):
r = []
for prop, value in vars(self).items():
r.append("%s = [%s]" % (prop, repr(value)))
return ", ".join(r)
class Operation:
def __init__(self, name, operation_type, meta_data=None, params=None, param_source=None):
if params is None:
params = {}
self.name = name
self.meta_data = meta_data if meta_data else {}
self.type = operation_type
self.params = params
self.param_source = param_source
@property
def include_in_reporting(self):
return self.params.get("include-in-reporting", True)
@property
def run_on_serverless(self):
return self.params.get("run-on-serverless", None)
def __hash__(self):
return hash(self.name)
def __eq__(self, other):
return isinstance(other, type(self)) and self.name == other.name
def __str__(self, *args, **kwargs):
return self.name
def __repr__(self):
r = []
for prop, value in vars(self).items():
r.append("%s = [%s]" % (prop, repr(value)))
return ", ".join(r)