# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# 	http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import inspect
import logging
import random
import types
from abc import ABC, abstractmethod

import esrally.track
from esrally import exceptions

# Mapping from task to scheduler
__SCHEDULERS = {}

"""
The scheduler module defines an API to determine *when* Rally should issue a request. There are two types of schedulers:

# Simple Schedulers

A simple scheduler has the following signature::

    class MySimpleScheduler:
        def __init__(task, target_throughput):
           ...

        def next(self, current):
            ...

In ``__init__`` the current task and the precalculated target throughput denoted in requests per second is provided.

The implementation of ``next`` gets passed the previous point in time in seconds, starting from zero and needs to
return the next point in time when Rally should issue a request.

# Regular Schedulers

If a simple scheduler is not sufficient, a more complex API can be implemented::

class MyScheduler:
    def __init__(self, task):
        ...

    def before_request(self, now):
        ...

    def after_request(self, now, weight, unit, request_meta_data):
        ...

    def next(self, current):
        ...

* In ``__init__`` the current task is provided. Implementations need to calculate the target throughput themselves based
  on the task's properties.
* ``before_request`` is invoked by Rally before a request is executed. ``now`` provides the current timestamp in seconds.
* Similarly, ``after_request`` is invoked by Rally after a response has been received. ``now`` is the current timestamp,
  ``weight``, ``unit`` and ``request_meta_data`` are passed from the respective runner. For a bulk request, Rally
  passes e.g. ``weight=5000, unit="docs"`` or for a search request ``weight=1, unit="ops"``. Note that when a request
  has finished with an error, the ``weight`` might be zero (depending on the type of error).
* ``next`` needs to behave identical to simple schedulers.

``before_request`` and ``after_request`` can be used to adjust the target throughput based on feedback from the runner.

If the scheduler also needs access to the parameter source, provide a ``parameter_source`` property. Rally injects the
task's parameter source into this property.
"""


def scheduler_for(task: esrally.track.Task):
    """
    Creates a scheduler instance

    :param task: The current task for which a scheduler is needed.
    :return: An initialized scheduler instance.
    """
    logger = logging.getLogger(__name__)
    if run_unthrottled(task):
        return Unthrottled()

    schedule = task.schedule or DeterministicScheduler.name
    try:
        scheduler_class = __SCHEDULERS[schedule]
    except KeyError:
        raise exceptions.RallyError(f"No scheduler available for name [{schedule}]")

    # for backwards-compatibility - treat existing schedulers as top-level schedulers
    if is_legacy_scheduler(scheduler_class):
        logger.warning("Scheduler [%s] implements a deprecated API. Please adapt it.", scheduler_class)
        return LegacyWrappingScheduler(task, scheduler_class)
    elif is_simple_scheduler(scheduler_class):
        logger.debug("Treating [%s] for [%s] as a simple scheduler.", scheduler_class, task)
        return UnitAwareScheduler(task, scheduler_class)
    else:
        logger.debug("Treating [%s] for [%s] as non-simple scheduler.", scheduler_class, task)
        return scheduler_class(task)


def run_unthrottled(task):
    """
    Determines whether a task needs to run unthrottled. Tasks that don't specify a target throughput are only considered
    as unthrottled if no explicit schedule or one of the built-in schedules is specified. Custom schedules might need
    more flexible approaches (e.g. because they simulate a throughput that changes according to a daily or weekly
    pattern) and thus specifying a target throughput might not always be appropriate.
    """
    return task.target_throughput is None and task.schedule in [None, PoissonScheduler.name, DeterministicScheduler.name]


def is_legacy_scheduler(scheduler_class):
    """
    Determines whether a scheduler is a legacy implementation that gets passed task parameters instead of only the
    target throughput.
    """
    constructor_params = inspect.signature(scheduler_class.__init__).parameters
    return len(constructor_params) >= 2 and "params" in constructor_params


def is_simple_scheduler(scheduler_class):
    """
    Determines whether a scheduler is a "simple" scheduler, i.e. it doesn't consider feedback from the runner.
    """
    methods = inspect.getmembers(scheduler_class, inspect.isfunction)
    method_names = [name for name, _ in methods]
    return not all(scheduler_method in method_names for scheduler_method in ["before_request", "after_request", "next"])


def register_scheduler(name, scheduler):
    """
    Registers a new scheduler. Attempting to register a scheduler with a name that is already taken will raise a ``SystemSetupError``.

    :param name: The name under which to register the scheduler.
    :param scheduler: Either a unary function ``float`` -> ``float`` or a class with the same interface as ``Scheduler``.

    """
    logger = logging.getLogger(__name__)
    if name in __SCHEDULERS:
        raise exceptions.SystemSetupError("A scheduler with the name [%s] is already registered." % name)
    # we'd rather use callable() but this will erroneously also classify a class as callable...
    if isinstance(scheduler, types.FunctionType):
        logger.warning("Function-based schedulers are deprecated. Please reimplement [%s] as a class.", str(scheduler))
        logger.debug("Registering function [%s] for [%s].", str(scheduler), str(name))

        # lazy initialize a delegating scheduler
        __SCHEDULERS[name] = lambda _: DelegatingScheduler(scheduler)
    else:
        logger.debug("Registering object [%s] for [%s].", str(scheduler), str(name))
        __SCHEDULERS[name] = scheduler


# Only intended for unit-testing!
def remove_scheduler(name):
    del __SCHEDULERS[name]


class SimpleScheduler(ABC):
    @abstractmethod
    def next(self, current): ...


class Scheduler(ABC):
    def before_request(self, now):
        pass

    def after_request(self, now, weight, unit, request_meta_data):
        pass

    @abstractmethod
    def next(self, current): ...


# Deprecated
class DelegatingScheduler(SimpleScheduler):
    """
    Delegates to a scheduler function and acts as an adapter to the rest of the system.
    """

    def __init__(self, delegate):
        super().__init__()
        self.delegate = delegate

    def next(self, current):
        return self.delegate(current)


# Deprecated
class LegacyWrappingScheduler(Scheduler):
    """
    Wraps legacy implementations to stay backwards-compatible with older scheduler implementations.
    """

    def __init__(self, task, legacy_scheduler_class):
        super().__init__()
        # the legacy API was based on parameters so only provide these
        self.legacy_scheduler = legacy_scheduler_class(task.params)

    def next(self, current):
        return self.legacy_scheduler.next(current)


class Unthrottled(Scheduler):
    """
    Rally-internal scheduler to handle unthrottled tasks.
    """

    def next(self, current):
        return 0

    def __str__(self):
        return "unthrottled"


class DeterministicScheduler(SimpleScheduler):
    """
    Schedules the next execution according to a
    `deterministic distribution <https://en.wikipedia.org/wiki/Degenerate_distribution>`_.
    """

    name = "deterministic"

    def __init__(self, task, target_throughput):
        super().__init__()
        self.wait_time = 1 / target_throughput

    def next(self, current):
        return current + self.wait_time

    def __str__(self):
        return "deterministic scheduler"


class PoissonScheduler(SimpleScheduler):
    """
    Schedules the next execution according to a
    `Poisson distribution <https://en.wikipedia.org/wiki/Poisson_distribution>`_. A Poisson distribution models random
    independent arrivals of clients which on average match the expected arrival rate which makes it suitable for
    modelling access in open systems.

    See also http://preshing.com/20111007/how-to-generate-random-timings-for-a-poisson-process/
    """

    name = "poisson"

    def __init__(self, task, target_throughput):
        super().__init__()
        self.rate = target_throughput

    def next(self, current):
        return current + random.expovariate(self.rate)

    def __str__(self):
        return "Poisson scheduler"


class UnitAwareScheduler(Scheduler):
    """
    Scheduler implementation that adjusts target throughput based on feedback from the runner. It delegates actual
    scheduling to the scheduler provided by the user in the track.

    """

    def __init__(self, task, scheduler_class):
        super().__init__()
        self.task = task
        self.first_request = True
        self.current_weight = None
        self.scheduler_class = scheduler_class
        # start unthrottled to avoid conditional logic on the hot code path
        self.scheduler = Unthrottled()

    def after_request(self, now, weight, unit, request_meta_data):
        if weight > 0 and (self.first_request or self.current_weight != weight):
            expected_unit = self.task.target_throughput.unit
            actual_unit = f"{unit}/s"
            if actual_unit != expected_unit:
                # *temporary* workaround to convert mismatching units to ops/s to stay backwards-compatible.
                #
                # This ensures that we throttle based on ops/s but report based on the original unit (as before).
                if expected_unit == "ops/s":
                    weight = 1
                    if self.first_request:
                        logging.getLogger(__name__).warning(
                            "Task [%s] throttles based on [%s] but reports [%s]. Please specify the target throughput in [%s] instead.",
                            self.task,
                            expected_unit,
                            actual_unit,
                            actual_unit,
                        )
                else:
                    raise exceptions.RallyAssertionError(
                        f"Target throughput for [{self.task}] is specified in "
                        f"[{expected_unit}] but the task throughput is measured "
                        f"in [{actual_unit}]."
                    )

            self.first_request = False
            self.current_weight = weight
            # throughput in requests/s for this client
            target_throughput = self.task.target_throughput.value / self.task.clients / self.current_weight
            self.scheduler = self.scheduler_class(self.task, target_throughput)

    def next(self, current):
        return self.scheduler.next(current)

    def __str__(self):
        return f"Unit-aware scheduler delegating to [{self.scheduler_class}]"


register_scheduler(DeterministicScheduler.name, DeterministicScheduler)
register_scheduler(PoissonScheduler.name, PoissonScheduler)
