# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# 	http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import collections
import numbers
import re
from enum import Enum, auto, unique

from esrally import exceptions
from esrally.utils import serverless


class Index:
    """
    Defines an index in Elasticsearch.
    """

    def __init__(self, name, body=None, types=None):
        """

        Creates a new index.

        :param name: The index name. Mandatory.
        :param body: A dict representation of the index body. Optional.
        :param types: A list of types. Should contain at least one type.
        """
        if types is None:
            types = []
        if body is None:
            body = {}
        self.name = name
        self.body = body
        self.types = types

    def matches(self, pattern):
        if pattern is None:
            return True
        elif pattern in ["_all", "*"]:
            return True
        elif self.name == pattern:
            return True
        else:
            return False

    def __str__(self):
        return self.name

    def __repr__(self):
        r = []
        for prop, value in vars(self).items():
            r.append("%s = [%s]" % (prop, repr(value)))
        return ", ".join(r)

    def __hash__(self):
        return hash(self.name)

    def __eq__(self, other):
        return self.name == other.name


class DataStream:
    """
    Defines a data stream in Elasticsearch.
    """

    def __init__(self, name):
        """

        Creates a new data stream.

        :param name: The data stream name. Mandatory.
        """
        self.name = name

    def matches(self, pattern):
        if pattern is None:
            return True
        elif pattern in ["_all", "*"]:
            return True
        elif self.name == pattern:
            return True
        else:
            return False

    def __str__(self):
        return self.name

    def __repr__(self):
        r = []
        for prop, value in vars(self).items():
            r.append("%s = [%s]" % (prop, repr(value)))
        return ", ".join(r)

    def __hash__(self):
        return hash(self.name)

    def __eq__(self, other):
        return self.name == other.name


class IndexTemplate:
    """
    Defines an index template in Elasticsearch.
    """

    def __init__(self, name, pattern, content, delete_matching_indices=False):
        """

        Creates a new index template.

        :param name: Name of the index template. Mandatory.
        :param pattern: The index pattern to which the index template applies. Mandatory.
        :param content: The content of the corresponding template. Mandatory.
        :param delete_matching_indices: Delete all indices that match the pattern before the benchmark iff True.
        """
        self.name = name
        self.pattern = pattern
        self.content = content
        self.delete_matching_indices = delete_matching_indices

    def __str__(self, *args, **kwargs):
        return self.name

    def __repr__(self):
        r = []
        for prop, value in vars(self).items():
            r.append("%s = [%s]" % (prop, repr(value)))
        return ", ".join(r)

    def __hash__(self):
        return hash(self.name)

    def __eq__(self, other):
        return self.name == other.name


class ComponentTemplate:
    """
    Defines a component template in Elasticsearch.
    """

    def __init__(self, name, content):
        """
        Creates a new index template.
        :param name: Name of the index template. Mandatory.
        :param content: The content of the corresponding template. Mandatory.
        """
        self.name = name
        self.content = content

    def __str__(self, *args, **kwargs):
        return self.name

    def __repr__(self):
        r = []
        for prop, value in vars(self).items():
            r.append("%s = [%s]" % (prop, repr(value)))
        return ", ".join(r)

    def __hash__(self):
        return hash(self.name)

    def __eq__(self, other):
        return self.name == other.name


class Documents:
    SOURCE_FORMAT_BULK = "bulk"

    def __init__(
        self,
        source_format,
        document_file=None,
        document_archive=None,
        base_url=None,
        includes_action_and_meta_data=False,
        number_of_documents=0,
        compressed_size_in_bytes=0,
        uncompressed_size_in_bytes=0,
        target_index=None,
        target_data_stream=None,
        target_type=None,
        meta_data=None,
    ):
        """

        :param source_format: The format of these documents. Mandatory.
        :param document_file: The file name of benchmark documents after decompression. Optional (e.g. for percolation we
        just need a mapping but no documents)
        :param document_archive: The file name of the compressed benchmark document name on the remote server. Optional (e.g. for
        percolation we just need a mapping but no documents)
        :param base_url: The URL from which to load data if they are not available locally. Optional.
        :param includes_action_and_meta_data: True, if the source file already includes the action and meta-data line. False, if it only
        contains documents.
        :param number_of_documents: The number of documents in the benchmark document. Needed for proper progress reporting. Only needed if
         a document_archive is given.
        :param compressed_size_in_bytes: The compressed size in bytes of the benchmark document. Needed for verification of the download and
         user reporting. Only useful if a document_archive is given (optional but recommended to be set).
        :param uncompressed_size_in_bytes: The size in bytes of the benchmark document after decompressing it.
        Only useful if a document_archive is given (optional but recommended to be set).
        :param target_index: The index to target for bulk operations. May be ``None`` if ``includes_action_and_meta_data`` is ``False``.
        :param target_data_stream: The data stream to target for bulk operations.
        Maybe be ``None`` if ``includes_action_and_meta_data`` is ``False``.
        :param target_type: The document type to target for bulk operations. May be ``None`` if ``includes_action_and_meta_data``
                            is ``False``.
        :param meta_data: A dict containing key-value pairs with additional meta-data describing documents. Optional.
        """

        self.source_format = source_format
        self.document_file = document_file
        self.document_archive = document_archive
        self.base_url = base_url
        self.includes_action_and_meta_data = includes_action_and_meta_data
        self._number_of_documents = number_of_documents
        self._compressed_size_in_bytes = compressed_size_in_bytes
        self._uncompressed_size_in_bytes = uncompressed_size_in_bytes
        self.target_index = target_index
        self.target_data_stream = target_data_stream
        self.target_type = target_type
        self.meta_data = meta_data or {}

    def has_compressed_corpus(self):
        return self.document_archive is not None

    def has_uncompressed_corpus(self):
        return self.document_file is not None

    @property
    def number_of_documents(self):
        return self._number_of_documents

    @number_of_documents.setter
    def number_of_documents(self, value):
        self._number_of_documents = value

    @property
    def uncompressed_size_in_bytes(self):
        return self._uncompressed_size_in_bytes

    @uncompressed_size_in_bytes.setter
    def uncompressed_size_in_bytes(self, value):
        self._uncompressed_size_in_bytes = value

    @property
    def compressed_size_in_bytes(self):
        return self._compressed_size_in_bytes

    @compressed_size_in_bytes.setter
    def compressed_size_in_bytes(self, value):
        self._compressed_size_in_bytes = value

    @property
    def number_of_lines(self):
        if self.includes_action_and_meta_data:
            return self.number_of_documents * 2
        else:
            return self.number_of_documents

    @property
    def is_bulk(self):
        return self.source_format == Documents.SOURCE_FORMAT_BULK

    def __str__(self):
        return "%s documents from %s" % (self.source_format, self.document_file)

    def __repr__(self):
        r = []
        for prop, value in vars(self).items():
            r.append("%s = [%s]" % (prop, repr(value)))
        return ", ".join(r)

    def __hash__(self):
        return (
            hash(self.source_format)
            ^ hash(self.document_file)
            ^ hash(self.document_archive)
            ^ hash(self.base_url)
            ^ hash(self.includes_action_and_meta_data)
            ^ hash(self.number_of_documents)
            ^ hash(self.compressed_size_in_bytes)
            ^ hash(self.uncompressed_size_in_bytes)
            ^ hash(self.target_index)
            ^ hash(self.target_data_stream)
            ^ hash(self.target_type)
            ^ hash(frozenset(self.meta_data.items()))
        )

    def __eq__(self, othr):
        return isinstance(othr, type(self)) and (
            self.source_format,
            self.document_file,
            self.document_archive,
            self.base_url,
            self.includes_action_and_meta_data,
            self.number_of_documents,
            self.compressed_size_in_bytes,
            self.uncompressed_size_in_bytes,
            self.target_type,
            self.target_data_stream,
            self.target_type,
            self.meta_data,
        ) == (
            othr.source_format,
            othr.document_file,
            othr.document_archive,
            othr.base_url,
            othr.includes_action_and_meta_data,
            othr.number_of_documents,
            othr.compressed_size_in_bytes,
            othr.uncompressed_size_in_bytes,
            othr.target_type,
            othr.target_data_stream,
            othr.target_type,
            othr.meta_data,
        )


class DocumentCorpus:
    def __init__(self, name, documents=None, meta_data=None):
        """

        :param name: The name of this document corpus. Mandatory.
        :param documents: A list of ``Documents`` instances that belong to this corpus.
        :param meta_data: A dict containing key-value pairs with additional meta-data describing this corpus. Optional.
        """
        self.name = name
        self.documents = documents or []
        self.meta_data = meta_data or {}

    def number_of_documents(self, source_format):
        num = 0
        for doc in self.documents:
            if doc.source_format == source_format:
                num += doc.number_of_documents
        return num

    def compressed_size_in_bytes(self, source_format):
        num = 0
        for doc in self.documents:
            if doc.source_format == source_format and doc.compressed_size_in_bytes is not None:
                num += doc.compressed_size_in_bytes
            else:
                return None
        return num

    def uncompressed_size_in_bytes(self, source_format):
        num = 0
        for doc in self.documents:
            if doc.source_format == source_format and doc.uncompressed_size_in_bytes is not None:
                num += doc.uncompressed_size_in_bytes
            else:
                return None
        return num

    def filter(self, source_format=None, target_indices=None, target_data_streams=None):
        filtered = []
        for d in self.documents:
            # skip if source format or target index does not match
            if source_format and d.source_format != source_format:
                continue
            if target_indices and d.target_index not in target_indices:
                continue
            if target_data_streams and d.target_data_stream not in target_data_streams:
                continue

            filtered.append(d)
        return DocumentCorpus(self.name, filtered, meta_data=dict(self.meta_data))

    def union(self, other):
        """
        Creates a new corpus based on the current and the provided other corpus. This is not meant as a generic union
        of two arbitrary corpora but rather to unify the documents referenced by two instances of the same corpus. This
        is useful when two tasks reference different subsets of a corpus and a unified view (e.g. for downloading the
        appropriate document files) is required.

        :param other: The other corpus to unify with this one. Must have the same name and meta-data.
        :return: A document corpus instance with the same and meta-data but with documents from both corpora.
        """
        if self.name != other.name:
            raise exceptions.RallyAssertionError(f"Corpora names differ: [{self.name}] and [{other.name}].")
        if self.meta_data != other.meta_data:
            raise exceptions.RallyAssertionError(f"Corpora meta-data differ: [{self.meta_data}] and [{other.meta_data}].")
        if self is other:
            return self
        else:
            return DocumentCorpus(
                name=self.name, documents=list(set(self.documents).union(other.documents)), meta_data=dict(self.meta_data)
            )

    def __str__(self):
        return self.name

    def __repr__(self):
        r = []
        for prop, value in vars(self).items():
            r.append("%s = [%s]" % (prop, repr(value)))
        return ", ".join(r)

    def __hash__(self):
        return hash(self.name) ^ hash(self.documents) ^ hash(frozenset(self.meta_data.items()))

    def __eq__(self, othr):
        return isinstance(othr, type(self)) and (self.name, self.documents, self.meta_data) == (othr.name, othr.documents, othr.meta_data)


class Track:
    """
    A track defines the data set that is used. It corresponds loosely to a use case (e.g. logging, event processing, analytics, ...)
    """

    def __init__(
        self,
        name,
        description=None,
        meta_data=None,
        challenges=None,
        indices=None,
        data_streams=None,
        templates=None,
        composable_templates=None,
        component_templates=None,
        corpora=None,
        dependencies=None,
        has_plugins=False,
        root=None,
    ):
        """

        Creates a new track.

        :param name: A short, descriptive name for this track. As per convention, this name should be in lower-case without spaces.
        :param description: A description for this track (should be less than 80 characters).
        :param meta_data: An optional dict of meta-data elements to attach to each metrics record. Default: {}.
        :param challenges: A list of one or more challenges to use. Precondition: If the list is non-empty it contains exactly one element
        with its ``default`` property set to ``True``.
        :param indices: A list of indices for this track. May be None.
        :param data_streams: A list of data streams for this track. May be None.
        :param templates: A list of index templates for this track. May be None.
        :param corpora: A list of document corpus definitions for this track. May be None.
        :param has_plugins: True iff the track also defines plugins (e.g. custom runners or parameter sources).
        :param root: The absolute path to the directory containing the track file(s).
        """
        self.name = name
        self.meta_data = meta_data if meta_data else {}
        self.description = description if description is not None else ""
        self.challenges = challenges if challenges else []
        self.indices = indices if indices else []
        self.data_streams = data_streams if data_streams else []
        self.corpora = corpora if corpora else []
        self.templates = templates if templates else []
        self.composable_templates = composable_templates if composable_templates else []
        self.component_templates = component_templates if component_templates else []
        self.dependencies = dependencies if dependencies else []
        self.has_plugins = has_plugins
        self.root = root

    @property
    def default_challenge(self):
        for challenge in self.challenges:
            if challenge.default:
                return challenge
        # This should only happen if we don't have any challenges
        return None

    @property
    def selected_challenge(self):
        for challenge in self.challenges:
            if challenge.selected:
                return challenge
        return None

    @property
    def selected_challenge_or_default(self):
        selected = self.selected_challenge
        return selected if selected else self.default_challenge

    def find_challenge_or_default(self, name):
        """
        :param name: The name of the challenge to find.
        :return: The challenge with the given name. The default challenge, if the name is "" or ``None``.
        """
        if name in [None, ""]:
            return self.default_challenge
        else:
            return self.find_challenge(name)

    def find_challenge(self, name):
        for challenge in self.challenges:
            if challenge.name == name:
                return challenge
        raise exceptions.InvalidName("Unknown challenge [%s] for track [%s]" % (name, self.name))

    @property
    def number_of_documents(self):
        num_docs = 0
        for corpus in self.corpora:
            # TODO #341: Improve API to let users define what they want (everything, just specific types, ...)
            num_docs += corpus.number_of_documents(Documents.SOURCE_FORMAT_BULK)
        return num_docs

    @property
    def compressed_size_in_bytes(self):
        size = 0
        for corpus in self.corpora:
            # TODO #341: Improve API to let users define what they want (everything, just specific types, ...)
            curr_size = corpus.compressed_size_in_bytes(Documents.SOURCE_FORMAT_BULK)
            if curr_size is not None:
                size += curr_size
            else:
                return None
        return size

    @property
    def uncompressed_size_in_bytes(self):
        size = 0
        for corpus in self.corpora:
            # TODO #341: Improve API to let users define what they want (everything, just specific types, ...)
            curr_size = corpus.uncompressed_size_in_bytes(Documents.SOURCE_FORMAT_BULK)
            if curr_size is not None:
                size += curr_size
            else:
                return None
        return size

    def index_names(self):
        return [i.name for i in self.indices]

    def data_stream_names(self):
        return [i.name for i in self.data_streams]

    def __str__(self):
        return self.name

    def __repr__(self):
        r = []
        for prop, value in vars(self).items():
            r.append("%s = [%s]" % (prop, repr(value)))
        return ", ".join(r)

    def __hash__(self):
        return (
            hash(self.name)
            ^ hash(self.meta_data)
            ^ hash(self.description)
            ^ hash(self.challenges)
            ^ hash(self.indices)
            ^ hash(self.templates)
            ^ hash(self.composable_templates)
            ^ hash(self.component_templates)
            ^ hash(self.corpora)
        )

    def __eq__(self, othr):
        return isinstance(othr, type(self)) and (
            self.name,
            self.meta_data,
            self.description,
            self.challenges,
            self.indices,
            self.data_streams,
            self.templates,
            self.composable_templates,
            self.component_templates,
            self.corpora,
        ) == (
            othr.name,
            othr.meta_data,
            othr.description,
            othr.challenges,
            othr.indices,
            othr.data_streams,
            othr.templates,
            othr.composable_templates,
            othr.component_templates,
            othr.corpora,
        )


class Challenge:
    """
    A challenge defines the concrete operations that will be done.
    """

    def __init__(
        self,
        name,
        description=None,
        user_info=None,
        default=False,
        selected=False,
        auto_generated=False,
        parameters=None,
        meta_data=None,
        schedule=None,
    ):
        self.name = name
        self.parameters = parameters if parameters else {}
        self.meta_data = meta_data if meta_data else {}
        self.description = description
        self.user_info = user_info
        self.serverless_info = []
        self.default = default
        self.selected = selected
        self.auto_generated = auto_generated
        self.schedule = schedule if schedule else []

    def prepend_tasks(self, tasks):
        self.schedule = tasks + self.schedule

    def remove_task(self, task):
        self.schedule.remove(task)

    def __str__(self):
        return self.name

    def __repr__(self):
        r = []
        for prop, value in vars(self).items():
            r.append("%s = [%s]" % (prop, repr(value)))
        return ", ".join(r)

    def __hash__(self):
        return (
            hash(self.name)
            ^ hash(self.description)
            ^ hash(self.default)
            ^ hash(self.selected)
            ^ hash(self.auto_generated)
            ^ hash(self.parameters)
            ^ hash(self.meta_data)
            ^ hash(self.schedule)
        )

    def __eq__(self, othr):
        return isinstance(othr, type(self)) and (
            self.name,
            self.description,
            self.default,
            self.selected,
            self.auto_generated,
            self.parameters,
            self.meta_data,
            self.schedule,
        ) == (
            othr.name,
            othr.description,
            othr.default,
            othr.selected,
            othr.auto_generated,
            othr.parameters,
            othr.meta_data,
            othr.schedule,
        )


@unique
class AdminStatus(Enum):
    # We can't use True/False as they are keywords
    Yes = auto()
    No = auto()


@unique
class OperationType(Enum):
    # TODO replace manual counts with auto() when we drop support for Python 3.10
    # https://docs.python.org/3/library/enum.html#enum.auto
    IndexStats = (1, AdminStatus.No, serverless.Status.Internal)
    NodeStats = (2, AdminStatus.No, serverless.Status.Internal)
    Search = (3, AdminStatus.No, serverless.Status.Public)
    Bulk = (4, AdminStatus.No, serverless.Status.Public)
    # Public as we can't verify the actual status
    RawRequest = (5, AdminStatus.No, serverless.Status.Public)
    WaitForRecovery = (6, AdminStatus.No, serverless.Status.Internal)
    WaitForSnapshotCreate = (7, AdminStatus.No, serverless.Status.Internal)
    # Public as all supported operation types are Public too (including RawRequest as
    # mentioned above)
    Composite = (8, AdminStatus.No, serverless.Status.Public)
    SubmitAsyncSearch = (9, AdminStatus.No, serverless.Status.Public)
    GetAsyncSearch = (10, AdminStatus.No, serverless.Status.Public)
    DeleteAsyncSearch = (11, AdminStatus.No, serverless.Status.Public)
    PaginatedSearch = (12, AdminStatus.No, serverless.Status.Public)
    ScrollSearch = (13, AdminStatus.No, serverless.Status.Public)
    OpenPointInTime = (14, AdminStatus.No, serverless.Status.Public)
    ClosePointInTime = (15, AdminStatus.No, serverless.Status.Public)
    Sql = (16, AdminStatus.No, serverless.Status.Public)
    FieldCaps = (17, AdminStatus.No, serverless.Status.Public)
    CompositeAgg = (18, AdminStatus.No, serverless.Status.Public)
    WaitForCurrentSnapshotsCreate = (19, AdminStatus.No, serverless.Status.Internal)
    Downsample = (20, AdminStatus.No, serverless.Status.Internal)
    Esql = (21, AdminStatus.No, serverless.Status.Public)

    # administrative actions
    ForceMerge = (22, AdminStatus.Yes, serverless.Status.Internal)
    ClusterHealth = (23, AdminStatus.Yes, serverless.Status.Internal)
    PutPipeline = (24, AdminStatus.Yes, serverless.Status.Public)
    Refresh = (25, AdminStatus.Yes, serverless.Status.Public)
    CreateIndex = (26, AdminStatus.Yes, serverless.Status.Public)
    DeleteIndex = (27, AdminStatus.Yes, serverless.Status.Public)
    CreateIndexTemplate = (28, AdminStatus.Yes, serverless.Status.Blocked)
    DeleteIndexTemplate = (29, AdminStatus.Yes, serverless.Status.Blocked)
    ShrinkIndex = (30, AdminStatus.Yes, serverless.Status.Blocked)
    CreateMlDatafeed = (31, AdminStatus.Yes, serverless.Status.Public)
    DeleteMlDatafeed = (32, AdminStatus.Yes, serverless.Status.Public)
    StartMlDatafeed = (33, AdminStatus.Yes, serverless.Status.Public)
    StopMlDatafeed = (34, AdminStatus.Yes, serverless.Status.Public)
    CreateMlJob = (35, AdminStatus.Yes, serverless.Status.Public)
    DeleteMlJob = (36, AdminStatus.Yes, serverless.Status.Public)
    OpenMlJob = (37, AdminStatus.Yes, serverless.Status.Public)
    CloseMlJob = (38, AdminStatus.Yes, serverless.Status.Public)
    Sleep = (39, AdminStatus.Yes, serverless.Status.Public)
    DeleteSnapshotRepository = (40, AdminStatus.Yes, serverless.Status.Internal)
    CreateSnapshotRepository = (41, AdminStatus.Yes, serverless.Status.Internal)
    CreateSnapshot = (42, AdminStatus.Yes, serverless.Status.Internal)
    RestoreSnapshot = (43, AdminStatus.Yes, serverless.Status.Internal)
    PutSettings = (44, AdminStatus.Yes, serverless.Status.Internal)
    CreateTransform = (45, AdminStatus.Yes, serverless.Status.Public)
    StartTransform = (46, AdminStatus.Yes, serverless.Status.Public)
    WaitForTransform = (47, AdminStatus.Yes, serverless.Status.Public)
    DeleteTransform = (48, AdminStatus.Yes, serverless.Status.Public)
    CreateDataStream = (49, AdminStatus.Yes, serverless.Status.Public)
    DeleteDataStream = (50, AdminStatus.Yes, serverless.Status.Public)
    CreateComposableTemplate = (51, AdminStatus.Yes, serverless.Status.Public)
    DeleteComposableTemplate = (52, AdminStatus.Yes, serverless.Status.Public)
    CreateComponentTemplate = (53, AdminStatus.Yes, serverless.Status.Public)
    DeleteComponentTemplate = (54, AdminStatus.Yes, serverless.Status.Public)
    TransformStats = (55, AdminStatus.Yes, serverless.Status.Public)
    CreateIlmPolicy = (56, AdminStatus.Yes, serverless.Status.Blocked)
    DeleteIlmPolicy = (57, AdminStatus.Yes, serverless.Status.Blocked)

    def __init__(self, id: int, admin_status: AdminStatus, serverless_status: serverless.Status):
        self.id = id
        self.admin_status = admin_status
        self.serverless_status = serverless_status

    @property
    def admin_op(self):
        return self.admin_status == AdminStatus.Yes

    def to_hyphenated_string(self):
        """
        Turns enum constants into hyphenated names, e.g. ``WaitForTransform`` becomes ``wait-for-transform``.
        """
        return "".join(["-" + c.lower() if c.isupper() else c for c in self.name]).lstrip("-")

    # pylint: disable=too-many-return-statements
    @classmethod
    def from_hyphenated_string(cls, v):
        if v == "force-merge":
            return OperationType.ForceMerge
        elif v == "index-stats":
            return OperationType.IndexStats
        elif v == "node-stats":
            return OperationType.NodeStats
        elif v == "search":
            return OperationType.Search
        elif v == "scroll-search":
            return OperationType.ScrollSearch
        elif v == "paginated-search":
            return OperationType.PaginatedSearch
        elif v == "composite-agg":
            return OperationType.CompositeAgg
        elif v == "cluster-health":
            return OperationType.ClusterHealth
        elif v == "bulk":
            return OperationType.Bulk
        elif v == "raw-request":
            return OperationType.RawRequest
        elif v == "put-pipeline":
            return OperationType.PutPipeline
        elif v == "refresh":
            return OperationType.Refresh
        elif v == "create-index":
            return OperationType.CreateIndex
        elif v == "delete-index":
            return OperationType.DeleteIndex
        elif v == "create-index-template":
            return OperationType.CreateIndexTemplate
        elif v == "delete-index-template":
            return OperationType.DeleteIndexTemplate
        elif v == "create-composable-template":
            return OperationType.CreateComposableTemplate
        elif v == "delete-composable-template":
            return OperationType.DeleteComposableTemplate
        elif v == "create-component-template":
            return OperationType.CreateComponentTemplate
        elif v == "delete-component-template":
            return OperationType.DeleteComponentTemplate
        elif v == "shrink-index":
            return OperationType.ShrinkIndex
        elif v == "create-ml-datafeed":
            return OperationType.CreateMlDatafeed
        elif v == "delete-ml-datafeed":
            return OperationType.DeleteMlDatafeed
        elif v == "start-ml-datafeed":
            return OperationType.StartMlDatafeed
        elif v == "stop-ml-datafeed":
            return OperationType.StopMlDatafeed
        elif v == "create-ml-job":
            return OperationType.CreateMlJob
        elif v == "delete-ml-job":
            return OperationType.DeleteMlJob
        elif v == "open-ml-job":
            return OperationType.OpenMlJob
        elif v == "close-ml-job":
            return OperationType.CloseMlJob
        elif v == "sleep":
            return OperationType.Sleep
        elif v == "delete-snapshot-repository":
            return OperationType.DeleteSnapshotRepository
        elif v == "create-snapshot-repository":
            return OperationType.CreateSnapshotRepository
        elif v == "create-snapshot":
            return OperationType.CreateSnapshot
        elif v == "wait-for-snapshot-create":
            return OperationType.WaitForSnapshotCreate
        elif v == "wait-for-current-snapshots-create":
            return OperationType.WaitForCurrentSnapshotsCreate
        elif v == "restore-snapshot":
            return OperationType.RestoreSnapshot
        elif v == "wait-for-recovery":
            return OperationType.WaitForRecovery
        elif v == "put-settings":
            return OperationType.PutSettings
        elif v == "create-transform":
            return OperationType.CreateTransform
        elif v == "start-transform":
            return OperationType.StartTransform
        elif v == "wait-for-transform":
            return OperationType.WaitForTransform
        elif v == "delete-transform":
            return OperationType.DeleteTransform
        elif v == "transform-stats":
            return OperationType.TransformStats
        elif v == "create-data-stream":
            return OperationType.CreateDataStream
        elif v == "delete-data-stream":
            return OperationType.DeleteDataStream
        elif v == "composite":
            return OperationType.Composite
        elif v == "submit-async-search":
            return OperationType.SubmitAsyncSearch
        elif v == "get-async-search":
            return OperationType.GetAsyncSearch
        elif v == "delete-async-search":
            return OperationType.DeleteAsyncSearch
        elif v == "open-point-in-time":
            return OperationType.OpenPointInTime
        elif v == "close-point-in-time":
            return OperationType.ClosePointInTime
        elif v == "create-ilm-policy":
            return OperationType.CreateIlmPolicy
        elif v == "delete-ilm-policy":
            return OperationType.DeleteIlmPolicy
        elif v == "sql":
            return OperationType.Sql
        elif v == "field-caps":
            return OperationType.FieldCaps
        elif v == "downsample":
            return OperationType.Downsample
        elif v == "esql":
            return OperationType.Esql
        else:
            raise KeyError(f"No enum value for [{v}]")


class TaskNameFilter:
    def __init__(self, name):
        self.name = name

    def matches(self, task):
        return self.name == task.name

    def __hash__(self):
        return hash(self.name)

    def __eq__(self, other):
        return isinstance(other, type(self)) and self.name == other.name

    def __str__(self, *args, **kwargs):
        return f"filter for task name [{self.name}]"


class TaskOpTypeFilter:
    def __init__(self, op_type_name):
        self.op_type = op_type_name

    def matches(self, task):
        return self.op_type == task.operation.type

    def __hash__(self):
        return hash(self.op_type)

    def __eq__(self, other):
        return isinstance(other, type(self)) and self.op_type == other.op_type

    def __str__(self, *args, **kwargs):
        return f"filter for operation type [{self.op_type}]"


class TaskTagFilter:
    def __init__(self, tag_name):
        self.tag_name = tag_name

    def matches(self, task):
        return self.tag_name in task.tags

    def __hash__(self):
        return hash(self.tag_name)

    def __eq__(self, other):
        return isinstance(other, type(self)) and self.tag_name == other.tag_name

    def __str__(self, *args, **kwargs):
        return f"filter for tasks tagged [{self.tag_name}]"


class Singleton(type):
    _instances = {}

    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            cls._instances[cls] = super().__call__(*args, **kwargs)
        return cls._instances[cls]


# Schedule elements
class Parallel:
    def __init__(self, tasks, clients=None):
        self.tasks = tasks
        self._clients = clients
        self.nested = True

    @property
    def clients(self):
        if self._clients is not None:
            return self._clients
        else:
            num_clients = 0
            for task in self.tasks:
                num_clients += task.clients
            return num_clients

    def matches(self, task_filter):
        # a parallel element matches if any of its elements match
        for task in self.tasks:
            if task.matches(task_filter):
                return True
        return False

    def remove_task(self, task):
        self.tasks.remove(task)

    def __iter__(self):
        return iter(self.tasks)

    def __str__(self, *args, **kwargs):
        return "%d parallel tasks" % len(self.tasks)

    def __repr__(self):
        r = []
        for prop, value in vars(self).items():
            r.append("%s = [%s]" % (prop, repr(value)))
        return ", ".join(r)

    def __hash__(self):
        return hash(self.tasks)

    def __eq__(self, other):
        return isinstance(other, type(self)) and self.tasks == other.tasks


Throughput = collections.namedtuple("Throughput", ["value", "unit"])


class Task:
    THROUGHPUT_PATTERN = re.compile(r"(?P<value>(\d*\.)?\d+)\s(?P<unit>\w+/s)")
    IGNORE_RESPONSE_ERROR_LEVEL_WHITELIST = ["non-fatal"]

    def __init__(
        self,
        name,
        operation,
        tags=None,
        meta_data=None,
        warmup_iterations=None,
        iterations=None,
        warmup_time_period=None,
        time_period=None,
        ramp_up_time_period=None,
        clients=1,
        completes_parent=False,
        any_completes_parent=False,
        schedule=None,
        params=None,
    ):
        self.name = name
        self.operation = operation
        if isinstance(tags, str):
            self.tags = [tags]
        elif tags:
            self.tags = tags
        else:
            self.tags = []
        self.meta_data = meta_data if meta_data else {}
        self.warmup_iterations = warmup_iterations
        self.iterations = iterations
        self.warmup_time_period = warmup_time_period
        self.time_period = time_period
        self.ramp_up_time_period = ramp_up_time_period
        self.clients = clients
        self.completes_parent = completes_parent
        self.any_completes_parent = any_completes_parent
        self.schedule = schedule
        self.params = params if params else {}
        self.nested = False

    def matches(self, task_filter):
        return task_filter.matches(self)

    @property
    def target_throughput(self):
        def numeric(v):
            # While booleans can be converted to a number (False -> 0, True -> 1), we don't want to allow that here
            return isinstance(v, numbers.Number) and not isinstance(v, bool)

        target_throughput = self.params.get("target-throughput")
        target_interval = self.params.get("target-interval")

        if target_interval is not None and target_throughput is not None:
            raise exceptions.InvalidSyntax(
                f"Task [{self}] specifies target-interval [{target_interval}] and "
                f"target-throughput [{target_throughput}] but only one of them is allowed."
            )

        value = None
        unit = "ops/s"

        if target_interval:
            if not numeric(target_interval):
                raise exceptions.InvalidSyntax(f"Target interval [{target_interval}] for task [{self}] must be numeric.")
            value = 1 / float(target_interval)
        elif target_throughput:
            if isinstance(target_throughput, str):
                matches = re.match(Task.THROUGHPUT_PATTERN, target_throughput)
                if matches:
                    value = float(matches.group("value"))
                    unit = matches.group("unit")
                else:
                    raise exceptions.InvalidSyntax(f"Task [{self}] specifies invalid target throughput [{target_throughput}].")
            elif numeric(target_throughput):
                value = float(target_throughput)
            else:
                raise exceptions.InvalidSyntax(f"Target throughput [{target_throughput}] for task [{self}] must be string or numeric.")

        if value:
            return Throughput(value, unit)
        else:
            return None

    @property
    def ignore_response_error_level(self):
        ignore_response_error_level = self.params.get("ignore-response-error-level")

        if ignore_response_error_level and ignore_response_error_level not in Task.IGNORE_RESPONSE_ERROR_LEVEL_WHITELIST:
            raise exceptions.InvalidSyntax(
                f"Task [{self}] specifies ignore-response-error-level to [{ignore_response_error_level}] but "
                f"the only allowed values are [{','.join(Task.IGNORE_RESPONSE_ERROR_LEVEL_WHITELIST)}]."
            )

        return ignore_response_error_level

    def error_behavior(self, default_error_behavior):
        """
        Returns the desired behavior when encountering errors during task execution.

        :param default_error_behavior: (str) the default error behavior for the benchmark
        :return: (str) prescribing error handling when a non-fatal error occurs:
            "abort": will fail when any error gets encountered
            "continue": will continue for non fatal errors
        """

        behavior = "continue"
        if default_error_behavior == "abort":
            if self.ignore_response_error_level != "non-fatal":
                behavior = "abort"

        return behavior

    def __hash__(self):
        # Note that we do not include `params` in __hash__ and __eq__ (the other attributes suffice to uniquely define a task)
        return (
            hash(self.name)
            ^ hash(self.operation)
            ^ hash(self.warmup_iterations)
            ^ hash(self.iterations)
            ^ hash(self.warmup_time_period)
            ^ hash(self.time_period)
            ^ hash(self.ramp_up_time_period)
            ^ hash(self.clients)
            ^ hash(self.schedule)
            ^ hash(self.completes_parent)
            ^ hash(self.any_completes_parent)
        )

    def __eq__(self, other):
        # Note that we do not include `params` in __hash__ and __eq__ (the other attributes suffice to uniquely define a task)
        return isinstance(other, type(self)) and (
            self.name,
            self.operation,
            self.warmup_iterations,
            self.iterations,
            self.warmup_time_period,
            self.time_period,
            self.ramp_up_time_period,
            self.clients,
            self.schedule,
            self.completes_parent,
            self.any_completes_parent,
        ) == (
            other.name,
            other.operation,
            other.warmup_iterations,
            other.iterations,
            other.warmup_time_period,
            other.time_period,
            other.ramp_up_time_period,
            other.clients,
            other.schedule,
            other.completes_parent,
            other.any_completes_parent,
        )

    def __iter__(self):
        return iter([self])

    def __str__(self, *args, **kwargs):
        return self.name

    def __repr__(self):
        r = []
        for prop, value in vars(self).items():
            r.append("%s = [%s]" % (prop, repr(value)))
        return ", ".join(r)


class Operation:
    def __init__(self, name, operation_type, meta_data=None, params=None, param_source=None):
        if params is None:
            params = {}
        self.name = name
        self.meta_data = meta_data if meta_data else {}
        self.type = operation_type
        self.params = params
        self.param_source = param_source

    @property
    def include_in_reporting(self):
        return self.params.get("include-in-reporting", True)

    @property
    def run_on_serverless(self):
        return self.params.get("run-on-serverless", None)

    def __hash__(self):
        return hash(self.name)

    def __eq__(self, other):
        return isinstance(other, type(self)) and self.name == other.name

    def __str__(self, *args, **kwargs):
        return self.name

    def __repr__(self):
        r = []
        for prop, value in vars(self).items():
            r.append("%s = [%s]" % (prop, repr(value)))
        return ", ".join(r)
