# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# 	http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import glob
import json
import logging
import os
import shutil
import uuid

import jinja2

from esrally import exceptions, types
from esrally.mechanic import java_resolver, team
from esrally.utils import console, convert, io, process


def local(cfg: types.Config, car, plugins, ip, http_port, all_node_ips, all_node_names, target_root, node_name):
    distribution_version = cfg.opts("mechanic", "distribution.version", mandatory=False)
    cluster_name = cfg.opts("mechanic", "cluster.name")

    node_root_dir = os.path.join(target_root, node_name)

    runtime_jdk_bundled = convert.to_bool(car.mandatory_var("runtime.jdk.bundled"))
    runtime_jdk = car.mandatory_var("runtime.jdk")
    _, java_home = java_resolver.java_home(runtime_jdk, cfg.opts("mechanic", "runtime.jdk"), runtime_jdk_bundled)

    es_installer = ElasticsearchInstaller(
        car, java_home, node_name, cluster_name, node_root_dir, all_node_ips, all_node_names, ip, http_port
    )
    plugin_installers = [PluginInstaller(plugin, java_home) for plugin in plugins]

    return BareProvisioner(es_installer, plugin_installers, distribution_version=distribution_version)


def docker(cfg: types.Config, car, ip, http_port, target_root, node_name):
    distribution_version = cfg.opts("mechanic", "distribution.version", mandatory=False)
    cluster_name = cfg.opts("mechanic", "cluster.name")
    rally_root = cfg.opts("node", "rally.root")

    node_root_dir = os.path.join(target_root, node_name)

    return DockerProvisioner(car, node_name, cluster_name, ip, http_port, node_root_dir, distribution_version, rally_root)


class NodeConfiguration:
    def __init__(self, build_type, car_runtime_jdks, car_provides_bundled_jdk, ip, node_name, node_root_path, binary_path, data_paths):
        self.build_type = build_type
        self.car_runtime_jdks = car_runtime_jdks
        self.car_provides_bundled_jdk = car_provides_bundled_jdk
        self.ip = ip
        self.node_name = node_name
        self.node_root_path = node_root_path
        self.binary_path = binary_path
        self.data_paths = data_paths

    def as_dict(self):
        return {
            "build-type": self.build_type,
            "car-runtime-jdks": self.car_runtime_jdks,
            "car-provides-bundled-jdk": self.car_provides_bundled_jdk,
            "ip": self.ip,
            "node-name": self.node_name,
            "node-root-path": self.node_root_path,
            "binary-path": self.binary_path,
            "data-paths": self.data_paths,
        }

    @staticmethod
    def from_dict(d):
        return NodeConfiguration(
            d["build-type"],
            d["car-runtime-jdks"],
            d["car-provides-bundled-jdk"],
            d["ip"],
            d["node-name"],
            d["node-root-path"],
            d["binary-path"],
            d["data-paths"],
        )


def save_node_configuration(path, n):
    with open(os.path.join(path, "node-config.json"), "w") as f:
        json.dump(n.as_dict(), f, indent=2)


def load_node_configuration(path):
    with open(os.path.join(path, "node-config.json")) as f:
        return NodeConfiguration.from_dict(json.load(f))


class ConfigLoader:
    def __init__(self):
        pass

    def load(self):
        pass


def _render_template(env, variables, file_name):
    try:
        template = env.get_template(io.basename(file_name))
        # force a new line at the end. Jinja seems to remove it.
        return template.render(variables) + "\n"
    except jinja2.exceptions.TemplateSyntaxError as e:
        raise exceptions.InvalidSyntax("%s in %s" % (str(e), file_name))
    except BaseException as e:
        raise exceptions.SystemSetupError("%s in %s" % (str(e), file_name))


def plain_text(file):
    _, ext = io.splitext(file)
    return ext in [".ini", ".txt", ".json", ".yml", ".yaml", ".options", ".properties"]


def cleanup(preserve, install_dir, data_paths):
    def delete_path(p):
        if os.path.exists(p):
            try:
                logger.debug("Deleting [%s].", p)
                shutil.rmtree(p)
            except OSError:
                logger.exception("Could not delete [%s]. Skipping...", p)

    logger = logging.getLogger(__name__)
    if preserve:
        console.info(f"Preserving benchmark candidate installation at [{install_dir}].", logger=logger)
    else:
        logger.info("Wiping benchmark candidate installation at [%s].", install_dir)
        for path in data_paths:
            delete_path(path)

        delete_path(install_dir)


def _apply_config(source_root_path, target_root_path, config_vars):
    logger = logging.getLogger(__name__)
    for root, _, files in os.walk(source_root_path):
        env = jinja2.Environment(loader=jinja2.FileSystemLoader(root))

        relative_root = root[len(source_root_path) + 1 :]
        absolute_target_root = os.path.join(target_root_path, relative_root)
        io.ensure_dir(absolute_target_root)

        for name in files:
            source_file = os.path.join(root, name)
            target_file = os.path.join(absolute_target_root, name)
            if plain_text(source_file):
                logger.info("Reading config template file [%s] and writing to [%s].", source_file, target_file)
                # automatically merge config snippets from plugins (e.g. if they want to add config to elasticsearch.yml)
                with open(target_file, mode="a", encoding="utf-8") as f:
                    f.write(_render_template(env, config_vars, source_file))
            else:
                logger.info("Treating [%s] as binary and copying as is to [%s].", source_file, target_file)
                shutil.copy(source_file, target_file)


class BareProvisioner:
    """
    The provisioner prepares the runtime environment for running the benchmark. It prepares all configuration files and copies the binary
    of the benchmark candidate to the appropriate place.
    """

    def __init__(self, es_installer, plugin_installers, distribution_version=None, apply_config=_apply_config):
        self.es_installer = es_installer
        self.plugin_installers = plugin_installers
        self.distribution_version = distribution_version
        self.apply_config = apply_config
        self.logger = logging.getLogger(__name__)

    def prepare(self, binary):
        self.es_installer.install(binary["elasticsearch"])
        # we need to immediately delete it as plugins may copy their configuration during installation.
        self.es_installer.delete_pre_bundled_configuration()

        # determine after installation because some variables will depend on the install directory
        target_root_path = self.es_installer.es_home_path
        provisioner_vars = self._provisioner_variables()
        for p in self.es_installer.config_source_paths:
            self.apply_config(p, target_root_path, provisioner_vars)

        for installer in self.plugin_installers:
            installer.install(target_root_path, binary.get(installer.plugin_name))
            for plugin_config_path in installer.config_source_paths:
                self.apply_config(plugin_config_path, target_root_path, provisioner_vars)

        # Never let install hooks modify our original provisioner variables and just provide a copy!
        self.es_installer.invoke_install_hook(team.BootstrapPhase.post_install, provisioner_vars.copy())
        for installer in self.plugin_installers:
            installer.invoke_install_hook(team.BootstrapPhase.post_install, provisioner_vars.copy())

        return NodeConfiguration(
            "tar",
            self.es_installer.car.mandatory_var("runtime.jdk"),
            convert.to_bool(self.es_installer.car.mandatory_var("runtime.jdk.bundled")),
            self.es_installer.node_ip,
            self.es_installer.node_name,
            self.es_installer.node_root_dir,
            self.es_installer.es_home_path,
            self.es_installer.data_paths,
        )

    def _provisioner_variables(self):
        plugin_variables = {}
        mandatory_plugins = []
        for installer in self.plugin_installers:
            plugin_variables.update(installer.variables)
            if installer.plugin.moved_to_module:
                self.logger.info(
                    "Skipping adding plugin [%s] to cluster setting 'plugin.mandatory' as it has been moved to a module",
                    installer.plugin_name,
                )
            else:
                mandatory_plugins.append(installer.plugin_name)

        cluster_settings = {}
        if mandatory_plugins:
            # as a safety measure, prevent the cluster to startup if something went wrong during plugin installation which
            # we did not detect already here. This ensures we fail fast.
            #
            # https://www.elastic.co/guide/en/elasticsearch/plugins/current/_plugins_directory.html#_mandatory_plugins
            cluster_settings["plugin.mandatory"] = mandatory_plugins

        provisioner_vars = {}
        provisioner_vars.update(self.es_installer.variables)
        provisioner_vars.update(plugin_variables)
        provisioner_vars["cluster_settings"] = cluster_settings

        return provisioner_vars


class ElasticsearchInstaller:
    def __init__(
        self,
        car,
        java_home,
        node_name,
        cluster_name,
        node_root_dir,
        all_node_ips,
        all_node_names,
        ip,
        http_port,
        hook_handler_class=team.BootstrapHookHandler,
    ):
        self.car = car
        self.java_home = java_home
        self.node_name = node_name
        self.cluster_name = cluster_name
        self.node_root_dir = node_root_dir
        self.install_dir = os.path.join(node_root_dir, "install")
        self.node_log_dir = os.path.join(node_root_dir, "logs", "server")
        self.heap_dump_dir = os.path.join(node_root_dir, "heapdump")
        self.all_node_ips = all_node_ips
        self.all_node_names = all_node_names
        self.node_ip = ip
        self.http_port = http_port
        self.hook_handler = hook_handler_class(self.car)
        if self.hook_handler.can_load():
            self.hook_handler.load()
        self.es_home_path = None
        self.data_paths = None
        self.logger = logging.getLogger(__name__)

    def install(self, binary):
        self.logger.info("Preparing candidate locally in [%s].", self.install_dir)
        io.ensure_dir(self.install_dir)
        io.ensure_dir(self.node_log_dir)
        io.ensure_dir(self.heap_dump_dir)

        self.logger.info("Unzipping %s to %s", binary, self.install_dir)
        io.decompress(binary, self.install_dir)
        self.es_home_path = glob.glob(os.path.join(self.install_dir, "elasticsearch*"))[0]
        self.data_paths = self._data_paths()

    def delete_pre_bundled_configuration(self):
        # TODO remove the below ignore when introducing type hints
        config_path = os.path.join(self.es_home_path, "config")  # type: ignore[arg-type]
        self.logger.info("Deleting pre-bundled Elasticsearch configuration at [%s]", config_path)
        shutil.rmtree(config_path)

    def invoke_install_hook(self, phase, variables):
        env = {}
        if self.java_home:
            env["JAVA_HOME"] = self.java_home
        self.hook_handler.invoke(phase.name, variables=variables, env=env)

    @property
    def variables(self):
        # bind as specifically as possible
        network_host = self.node_ip

        defaults = {
            "cluster_name": self.cluster_name,
            "node_name": self.node_name,
            "data_paths": self.data_paths,
            "log_path": self.node_log_dir,
            "heap_dump_path": self.heap_dump_dir,
            # this is the node's IP address as specified by the user when invoking Rally
            "node_ip": self.node_ip,
            # this is the IP address that the node will be bound to. Rally will bind to the node's IP address (but not to 0.0.0.0). The
            # reason is that we use the node's IP address as subject alternative name in x-pack.
            "network_host": network_host,
            "http_port": str(self.http_port),
            "transport_port": str(self.http_port + 100),
            "all_node_ips": '["%s"]' % '","'.join(self.all_node_ips),
            "all_node_names": '["%s"]' % '","'.join(self.all_node_names),
            # at the moment we are strict and enforce that all nodes are master eligible nodes
            "minimum_master_nodes": len(self.all_node_ips),
            "install_root_path": self.es_home_path,
        }
        variables = {}
        variables.update(self.car.variables)
        variables.update(defaults)
        return variables

    @property
    def config_source_paths(self):
        return self.car.config_paths

    def _data_paths(self):
        if "data_paths" in self.car.variables:
            data_paths = self.car.variables["data_paths"]
            if isinstance(data_paths, str):
                return [data_paths]
            elif isinstance(data_paths, list):
                return data_paths
            else:
                raise exceptions.SystemSetupError("Expected [data_paths] to be either a string or a list but was [%s]." % type(data_paths))
        else:
            # TODO remove the below ignore when introducing type hints
            return [os.path.join(self.es_home_path, "data")]  # type: ignore[arg-type]


class PluginInstaller:
    def __init__(self, plugin, java_home, hook_handler_class=team.BootstrapHookHandler):
        self.plugin = plugin
        self.java_home = java_home
        self.hook_handler = hook_handler_class(self.plugin)
        if self.hook_handler.can_load():
            self.hook_handler.load()
        self.logger = logging.getLogger(__name__)

    def install(self, es_home_path, plugin_url=None):
        installer_binary_path = os.path.join(es_home_path, "bin", "elasticsearch-plugin")
        if plugin_url:
            self.logger.info("Installing [%s] into [%s] from [%s]", self.plugin_name, es_home_path, plugin_url)
            install_cmd = '%s install --batch "%s"' % (installer_binary_path, plugin_url)
        else:
            self.logger.info("Installing [%s] into [%s]", self.plugin_name, es_home_path)
            install_cmd = '%s install --batch "%s"' % (installer_binary_path, self.plugin_name)

        return_code = process.run_subprocess_with_logging(install_cmd, env=self.env())
        # see: https://www.elastic.co/guide/en/elasticsearch/plugins/current/_other_command_line_parameters.html
        if return_code == 0:
            self.logger.info("Successfully installed [%s].", self.plugin_name)
        elif return_code == 64:
            # most likely this is an unknown plugin
            raise exceptions.SystemSetupError("Unknown plugin [%s]" % self.plugin_name)
        elif return_code == 74:
            raise exceptions.SupplyError("I/O error while trying to install [%s]" % self.plugin_name)
        else:
            raise exceptions.RallyError(
                "Unknown error while trying to install [%s] (installer return code [%s]). Please check the logs."
                % (self.plugin_name, str(return_code))
            )

    def invoke_install_hook(self, phase, variables):
        self.hook_handler.invoke(phase.name, variables=variables, env=self.env())

    def env(self):
        env = {}
        if self.java_home:
            env["JAVA_HOME"] = self.java_home
        return env

    @property
    def variables(self):
        return self.plugin.variables

    @property
    def config_source_paths(self):
        return self.plugin.config_paths

    @property
    def plugin_name(self):
        return self.plugin.name

    @property
    def sub_plugin_name(self):
        # if a plugin consists of multiple plugins (e.g. x-pack) we're interested in that name
        return self.variables.get("plugin_name", self.plugin_name)


class DockerProvisioner:
    def __init__(self, car, node_name, cluster_name, ip, http_port, node_root_dir, distribution_version, rally_root):
        self.car = car
        self.node_name = node_name
        self.cluster_name = cluster_name
        self.node_ip = ip
        self.http_port = http_port
        self.node_root_dir = node_root_dir
        self.node_log_dir = os.path.join(node_root_dir, "logs", "server")
        self.heap_dump_dir = os.path.join(node_root_dir, "heapdump")
        self.distribution_version = distribution_version
        self.rally_root = rally_root
        self.binary_path = os.path.join(node_root_dir, "install")
        # use a random subdirectory to isolate multiple runs because an external (non-root) user cannot clean it up.
        self.data_paths = [os.path.join(node_root_dir, "data", str(uuid.uuid4()))]
        self.logger = logging.getLogger(__name__)

        provisioner_defaults = {
            "cluster_name": self.cluster_name,
            "node_name": self.node_name,
            # we bind-mount the directories below on the host to these ones.
            "install_root_path": "/usr/share/elasticsearch",
            "data_paths": ["/usr/share/elasticsearch/data"],
            "log_path": "/var/log/elasticsearch",
            "heap_dump_path": "/usr/share/elasticsearch/heapdump",
            # Docker container needs to expose service on external interfaces
            "network_host": "0.0.0.0",
            "discovery_type": "single-node",
            "http_port": str(self.http_port),
            "transport_port": str(self.http_port + 100),
            "cluster_settings": {},
        }

        self.config_vars = {}
        self.config_vars.update(self.car.variables)
        self.config_vars.update(provisioner_defaults)

    def prepare(self, binary):
        # we need to allow other users to write to these directories due to Docker.
        #
        # Although os.mkdir passes 0o777 by default, mkdir(2) uses `mode & ~umask & 0777` to determine the final flags and
        # hence we need to modify the process' umask here. For details see https://linux.die.net/man/2/mkdir.
        previous_umask = os.umask(0)
        try:
            io.ensure_dir(self.binary_path)
            io.ensure_dir(self.node_log_dir)
            io.ensure_dir(self.heap_dump_dir)
            io.ensure_dir(self.data_paths[0])
        finally:
            os.umask(previous_umask)

        mounts = {}

        for car_config_path in self.car.config_paths:
            for root, _, files in os.walk(car_config_path):
                env = jinja2.Environment(loader=jinja2.FileSystemLoader(root))

                relative_root = root[len(car_config_path) + 1 :]
                absolute_target_root = os.path.join(self.binary_path, relative_root)
                io.ensure_dir(absolute_target_root)

                for name in files:
                    source_file = os.path.join(root, name)
                    target_file = os.path.join(absolute_target_root, name)
                    mounts[target_file] = os.path.join("/usr/share/elasticsearch", relative_root, name)
                    if plain_text(source_file):
                        self.logger.info("Reading config template file [%s] and writing to [%s].", source_file, target_file)
                        with open(target_file, mode="a", encoding="utf-8") as f:
                            f.write(_render_template(env, self.config_vars, source_file))
                    else:
                        self.logger.info("Treating [%s] as binary and copying as is to [%s].", source_file, target_file)
                        shutil.copy(source_file, target_file)

        docker_cfg = self._render_template_from_file(self.docker_vars(mounts))
        self.logger.info("Starting Docker container with configuration:\n%s", docker_cfg)

        with open(os.path.join(self.binary_path, "docker-compose.yml"), mode="w", encoding="utf-8") as f:
            f.write(docker_cfg)

        return NodeConfiguration(
            "docker",
            self.car.mandatory_var("runtime.jdk"),
            convert.to_bool(self.car.mandatory_var("runtime.jdk.bundled")),
            self.node_ip,
            self.node_name,
            self.node_root_dir,
            self.binary_path,
            self.data_paths,
        )

    def docker_vars(self, mounts):
        v = {
            "es_version": self.distribution_version,
            "docker_image": self.car.mandatory_var("docker_image"),
            "node_ip": self.node_ip,
            "http_port": self.http_port,
            "es_data_dir": self.data_paths[0],
            "es_log_dir": self.node_log_dir,
            "es_heap_dump_dir": self.heap_dump_dir,
            "mounts": mounts,
        }
        self._add_if_defined_for_car(v, "docker_mem_limit")
        self._add_if_defined_for_car(v, "docker_cpu_count")
        return v

    def _add_if_defined_for_car(self, variables, key):
        if key in self.car.variables:
            variables[key] = self.car.variables[key]

    def _render_template(self, loader, template_name, variables):
        try:
            env = jinja2.Environment(loader=loader)
            for k, v in variables.items():
                env.globals[k] = v
            template = env.get_template(template_name)

            return template.render()
        except jinja2.exceptions.TemplateSyntaxError as e:
            raise exceptions.InvalidSyntax("%s in %s" % (str(e), template_name))
        except BaseException as e:
            raise exceptions.SystemSetupError("%s in %s" % (str(e), template_name))

    def _render_template_from_file(self, variables):
        compose_file = os.path.join(self.rally_root, "resources", "docker-compose.yml.j2")
        return self._render_template(
            loader=jinja2.FileSystemLoader(io.dirname(compose_file)), template_name=io.basename(compose_file), variables=variables
        )
