# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# 	http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import logging
import socket
import traceback

import thespian.actors
import thespian.system.messages.status

from esrally import exceptions, log
from esrally.utils import console, net


class BenchmarkFailure:
    """
    Indicates a failure in the benchmark execution due to an exception
    """

    def __init__(self, message, cause=None):
        self.message = message
        self.cause = cause


class BenchmarkCancelled:
    """
    Indicates that the benchmark has been cancelled (by the user).
    """


def parametrized(decorator):
    """

    Helper meta-decorator that allows us to provide parameters to a decorator.

    :param decorator: The decorator that should accept parameters.
    """

    def inner(*args, **kwargs):
        def g(f):
            return decorator(f, *args, **kwargs)

        return g

    return inner


@parametrized
def no_retry(f, actor_name):
    """

    Decorator intended for Thespian message handlers with the signature ``receiveMsg_$MSG_NAME(self, msg, sender)``. Thespian will
    assume that a message handler that raises an exception can be retried. It will then retry once and give up afterwards just leaving
    a trace of that in the actor system's internal log file. However, this is usually *not* what we want in Rally. If handling of a
    message fails we instead want to notify a node higher up in the actor hierarchy.

    We achieve that by sending a ``BenchmarkFailure`` message to the original sender. Note that this might as well be the current
    actor (e.g. when handling a ``Wakeup`` message). In that case the actor itself is responsible for forwarding the benchmark failure
    to its parent actor.

    Example usage:

    @no_retry("special forces actor")
    def receiveMsg_DefuseBomb(self, msg, sender):
        # might raise an exception
        pass

    If this message handler raises an exception, the decorator will turn it into a ``BenchmarkFailure`` message with its ``message``
    property set to "Error in special forces actor" which is returned to the original sender.

    :param f: The message handler. Does not need to passed directly, this is handled by the decorator infrastructure.
    :param actor_name: A human readable name of the current actor that should be used in the exception message.
    """

    def guard(self, msg, sender):
        # noinspection PyBroadException
        try:
            return f(self, msg, sender)
        except BaseException:
            # log here as the full trace might get lost.
            logging.getLogger(__name__).exception("Error in %s", actor_name)
            # don't forward the exception as is because the main process might not have this class available on the load path
            # and will fail then while deserializing the cause.
            self.send(sender, BenchmarkFailure(traceback.format_exc()))

    return guard


class RallyActor(thespian.actors.ActorTypeDispatcher):
    def __init__(self, *args, **kw):
        super().__init__(*args, **kw)
        self.children = []
        self.received_responses = []
        self.status = None
        log.post_configure_actor_logging()
        self.logger = logging.getLogger(__name__)
        console.set_assume_tty(assume_tty=False)

    # The method name is required by the actor framework
    # noinspection PyPep8Naming
    @staticmethod
    def actorSystemCapabilityCheck(capabilities, requirements):
        for name, value in requirements.items():
            current = capabilities.get(name, None)
            if current != value:
                # A mismatch by is not a problem by itself as long as at least one actor system instance matches the requirements.
                return False
        return True

    def transition_when_all_children_responded(self, sender, msg, expected_status, new_status, transition):
        """

        Waits until all children have sent a specific response message and then transitions this actor to a new status.

        :param sender: The child actor that has responded.
        :param msg: The response message.
        :param expected_status: The status in which this actor should be upon calling this method.
        :param new_status: The new status once all child actors have responded.
        :param transition: A parameter-less function to call immediately after changing the status.
        """
        if self.is_current_status_expected(expected_status):
            self.received_responses.append(msg)
            response_count = len(self.received_responses)
            expected_count = len(self.children)

            self.logger.debug(
                "[%d] of [%d] child actors have responded for transition from [%s] to [%s].",
                response_count,
                expected_count,
                self.status,
                new_status,
            )
            if response_count == expected_count:
                self.logger.debug(
                    "All [%d] child actors have responded. Transitioning now from [%s] to [%s].", expected_count, self.status, new_status
                )
                # all nodes have responded, change status
                self.status = new_status
                self.received_responses = []
                transition()
            elif response_count > expected_count:
                raise exceptions.RallyAssertionError(
                    "Received [%d] responses but only [%d] were expected to transition from [%s] to [%s]. The responses are: %s"
                    % (response_count, expected_count, self.status, new_status, self.received_responses)
                )
        else:
            raise exceptions.RallyAssertionError(
                "Received [%s] from [%s] but we are in status [%s] instead of [%s]." % (type(msg), sender, self.status, expected_status)
            )

    def send_to_children_and_transition(self, sender, msg, expected_status, new_status):
        """

        Sends the provided message to all child actors and immediately transitions to the new status.

        :param sender: The actor from which we forward this message (in case it is message forwarding). Otherwise our own address.
        :param msg: The message to send.
        :param expected_status: The status in which this actor should be upon calling this method.
        :param new_status: The new status.
        """
        if self.is_current_status_expected(expected_status):
            self.logger.debug("Transitioning from [%s] to [%s].", self.status, new_status)
            self.status = new_status
            for m in filter(None, self.children):
                self.send(m, msg)
        else:
            raise exceptions.RallyAssertionError(
                "Received [%s] from [%s] but we are in status [%s] instead of [%s]." % (type(msg), sender, self.status, expected_status)
            )

    def is_current_status_expected(self, expected_status):
        # if we don't expect anything, we're always in the right status
        if not expected_status:
            return True
        # do an explicit check for a list here because strings are also iterable and we have very tight control over this code anyway.
        elif isinstance(expected_status, list):
            return self.status in expected_status
        else:
            return self.status == expected_status


def actor_system_already_running(ip="127.0.0.1"):
    """
    Determines whether an actor system is already running by opening a socket connection.

    Note: It may be possible that another system is running on the same port.
    """
    s = socket.socket()
    try:
        s.connect((ip, 1900))
        s.close()
        return True
    except Exception:
        return False


__SYSTEM_BASE = "multiprocTCPBase"


def use_offline_actor_system():
    global __SYSTEM_BASE
    __SYSTEM_BASE = "multiprocQueueBase"


def bootstrap_actor_system(try_join=False, prefer_local_only=False, local_ip=None, coordinator_ip=None):
    logger = logging.getLogger(__name__)
    system_base = __SYSTEM_BASE
    try:
        if try_join:
            if actor_system_already_running():
                logger.debug("Joining already running actor system with system base [%s].", system_base)
                return thespian.actors.ActorSystem(system_base)
            else:
                logger.debug("Creating new actor system with system base [%s] on coordinator node.", system_base)
                # if we try to join we can only run on the coordinator...
                return thespian.actors.ActorSystem(system_base, logDefs=log.load_configuration(), capabilities={"coordinator": True})
        elif prefer_local_only:
            coordinator = True
            if system_base != "multiprocQueueBase":
                coordinator_ip = "127.0.0.1"
                local_ip = "127.0.0.1"
            else:
                coordinator_ip = None
                local_ip = None
        else:
            if system_base not in ("multiprocTCPBase", "multiprocUDPBase"):
                raise exceptions.SystemSetupError("Rally requires a network-capable system base but got [%s]." % system_base)
            if not coordinator_ip:
                raise exceptions.SystemSetupError("coordinator IP is required")
            if not local_ip:
                raise exceptions.SystemSetupError("local IP is required")
            # always resolve the public IP here, even if a DNS name is given. Otherwise Thespian will be unhappy
            local_ip = net.resolve(local_ip)
            coordinator_ip = net.resolve(coordinator_ip)

            coordinator = local_ip == coordinator_ip

        capabilities = {"coordinator": coordinator}
        if local_ip:
            # just needed to determine whether to run benchmarks locally
            capabilities["ip"] = local_ip
        if coordinator_ip:
            # Make the coordinator node the convention leader
            capabilities["Convention Address.IPv4"] = "%s:1900" % coordinator_ip
        logger.info("Starting actor system with system base [%s] and capabilities [%s].", system_base, capabilities)
        return thespian.actors.ActorSystem(system_base, logDefs=log.load_configuration(), capabilities=capabilities)
    except thespian.actors.ActorSystemException:
        logger.exception("Could not initialize internal actor system.")
        raise
