# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# 	http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import datetime
import getpass
import glob
import grp
import logging
import os
import shutil
import urllib.error

import docker
from esrally import PROGRAM_NAME, exceptions, paths, types
from esrally.exceptions import BuildError, SystemSetupError
from esrally.utils import console, convert, git, io, jvm, net, process, sysstats

DEFAULT_ELASTICSEARCH_BRANCH = "main"
DEFAULT_PLUGIN_BRANCH = "main"


def create(cfg: types.Config, sources, distribution, car, plugins=None):
    logger = logging.getLogger(__name__)
    if plugins is None:
        plugins = []
    caching_enabled = cfg.opts("source", "cache", mandatory=False, default_value=True)
    revisions = _extract_revisions(cfg.opts("mechanic", "source.revision", mandatory=sources))
    source_build_method = cfg.opts("mechanic", "source.build.method", mandatory=False, default_value="default")
    distribution_version = cfg.opts("mechanic", "distribution.version", mandatory=False)
    supply_requirements = _supply_requirements(sources, distribution, plugins, revisions, distribution_version)
    build_needed = any(build for _, _, build in supply_requirements.values())
    es_supplier_type, es_version, _ = supply_requirements["elasticsearch"]
    src_config = cfg.all_opts("source")
    suppliers = []

    target_os = cfg.opts("mechanic", "target.os", mandatory=False)
    target_arch = cfg.opts("mechanic", "target.arch", mandatory=False)
    template_renderer = TemplateRenderer(version=es_version, os_name=target_os, arch=target_arch)

    if build_needed:
        es_src_dir = os.path.join(_src_dir(cfg), _config_value(src_config, "elasticsearch.src.subdir"))

        if source_build_method == "docker":
            builder = DockerBuilder(src_dir=es_src_dir, log_dir=paths.logs(), client=docker.from_env())
        else:
            raw_build_jdk = car.mandatory_var("build.jdk")
            try:
                build_jdk = int(raw_build_jdk)
            except ValueError:
                raise exceptions.SystemSetupError(f"Car config key [build.jdk] is invalid: [{raw_build_jdk}] (must be int)")
            builder = Builder(
                build_jdk=build_jdk,
                src_dir=es_src_dir,
                log_dir=paths.logs(),
            )

    else:
        builder = None

    distributions_root = os.path.join(cfg.opts("node", "root.dir"), cfg.opts("source", "distribution.dir"))
    dist_cfg = {}
    # car / plugin defines defaults...
    dist_cfg.update(car.variables)
    for plugin in plugins:
        for k, v in plugin.variables.items():
            dist_cfg[f"plugin_{plugin.name}_{k}"] = v
    # ... but the user can override it in rally.ini
    dist_cfg.update(cfg.all_opts("distributions"))

    if caching_enabled:
        logger.info("Enabling source artifact caching.")
        max_age_days = int(cfg.opts("source", "cache.days", mandatory=False, default_value=7))
        if max_age_days <= 0:
            raise exceptions.SystemSetupError(f"cache.days must be a positive number but is {max_age_days}")

        source_distributions_root = os.path.join(distributions_root, "src")
        _prune(source_distributions_root, max_age_days)
    else:
        logger.info("Disabling source artifact caching.")
        source_distributions_root = None

    if es_supplier_type == "source":
        es_src_dir = os.path.join(_src_dir(cfg), _config_value(src_config, "elasticsearch.src.subdir"))

        source_supplier = ElasticsearchSourceSupplier(
            revision=es_version,
            es_src_dir=es_src_dir,
            remote_url=cfg.opts("source", "remote.repo.url"),
            car=car,
            builder=builder,
            template_renderer=template_renderer,
        )

        if caching_enabled:
            es_file_resolver = ElasticsearchFileNameResolver(dist_cfg, template_renderer)
            source_supplier = CachedSourceSupplier(source_distributions_root, source_supplier, es_file_resolver)

        suppliers.append(source_supplier)
        repo = None
    else:
        es_src_dir = None
        repo = DistributionRepository(
            name=cfg.opts("mechanic", "distribution.repository"), distribution_config=dist_cfg, template_renderer=template_renderer
        )
        # TODO remove the below ignore when introducing type hints
        suppliers.append(ElasticsearchDistributionSupplier(repo, es_version, distributions_root))  # type: ignore[arg-type]

    for plugin in plugins:
        if plugin.moved_to_module:
            # TODO: https://github.com/elastic/rally/issues/1622
            # If it is listed as a core plugin (i.e. a user has overriden the team's path or revision), then we will build
            # We still need to use the post-install hooks to configure the keystore, so don't remove from list of plugins
            logger.info("Plugin [%s] is now an Elasticsearch module and no longer needs to be built from source.", plugin.name)
            continue

        supplier_type, plugin_version, _ = supply_requirements[plugin.name]

        if supplier_type == "source":
            if CorePluginSourceSupplier.can_handle(plugin):
                logger.info("Adding core plugin source supplier for [%s].", plugin.name)
                assert es_src_dir is not None, f"Cannot build core plugin {plugin.name} when Elasticsearch is not built from source."
                plugin_supplier = CorePluginSourceSupplier(plugin, es_src_dir, builder)
            elif ExternalPluginSourceSupplier.can_handle(plugin):
                logger.info("Adding external plugin source supplier for [%s].", plugin.name)
                plugin_supplier = ExternalPluginSourceSupplier(
                    plugin,
                    plugin_version,
                    _src_dir(cfg, mandatory=False),
                    src_config,
                    Builder(
                        src_dir=es_src_dir,
                        log_dir=paths.logs(),
                    ),
                )
            else:
                raise exceptions.RallyError(
                    "Plugin %s can neither be treated as core nor as external plugin. Requirements: %s"
                    % (plugin.name, supply_requirements[plugin.name])
                )

            if caching_enabled:
                plugin_file_resolver = PluginFileNameResolver(plugin.name, plugin_version)
                plugin_supplier = CachedSourceSupplier(source_distributions_root, plugin_supplier, plugin_file_resolver)
            suppliers.append(plugin_supplier)  # type: ignore[arg-type]  # TODO remove this ignore when introducing type hints
        else:
            logger.info("Adding plugin distribution supplier for [%s].", plugin.name)
            assert repo is not None, "Cannot benchmark plugin %s from a distribution version but Elasticsearch from sources" % plugin.name
            # TODO remove the below ignore when introducing type hints
            suppliers.append(PluginDistributionSupplier(repo, plugin))  # type: ignore[arg-type]

    return CompositeSupplier(suppliers)


def _required_version(version):
    if not version or version.strip() == "":
        raise exceptions.SystemSetupError(
            "Could not determine version. Please specify the Elasticsearch distribution "
            "to download with the command line parameter --distribution-version."
        )
    return version


def _required_revision(revisions, key, name=None):
    try:
        return revisions[key]
    except KeyError:
        n = name if name is not None else key
        raise exceptions.SystemSetupError("No revision specified for %s" % n)


def _supply_requirements(sources, distribution, plugins, revisions, distribution_version):
    # per artifact (elasticsearch or a specific plugin):
    #   * key: artifact
    #   * value: ("source" | "distribution", distribution_version | revision, build = True | False)
    supply_requirements = {}

    # can only build Elasticsearch with source-related pipelines -> ignore revision in that case
    if "elasticsearch" in revisions and sources:
        supply_requirements["elasticsearch"] = ("source", _required_revision(revisions, "elasticsearch", "Elasticsearch"), True)
    else:
        # no revision given or explicitly specified that it's from a distribution -> must use a distribution
        supply_requirements["elasticsearch"] = ("distribution", _required_version(distribution_version), False)

    for plugin in plugins:
        if plugin.moved_to_module:
            # TODO: https://github.com/elastic/rally/issues/1622
            continue
        elif plugin.core_plugin:
            # core plugins are entirely dependent upon Elasticsearch.
            supply_requirements[plugin.name] = supply_requirements["elasticsearch"]
        else:
            # allow catch-all only if we're generally building from sources. If it is mixed, the user should tell explicitly.
            if plugin.name in revisions or ("all" in revisions and sources):
                # be a bit more lenient when checking for plugin revisions. This allows users to specify `--revision="current"` and
                # rely on Rally to do the right thing.
                try:
                    plugin_revision = revisions[plugin.name]
                except KeyError:
                    # maybe we can use the catch-all revision (only if it's not a git revision)
                    plugin_revision = revisions.get("all")
                    if not plugin_revision or SourceRepository.is_commit_hash(plugin_revision):
                        raise exceptions.SystemSetupError("No revision specified for plugin [%s]." % plugin.name)
                    logging.getLogger(__name__).info(
                        "Revision for [%s] is not explicitly defined. Using catch-all revision [%s].", plugin.name, plugin_revision
                    )
                supply_requirements[plugin.name] = ("source", plugin_revision, True)
            else:
                supply_requirements[plugin.name] = (distribution, _required_version(distribution_version), False)
    return supply_requirements


def _src_dir(cfg: types.Config, mandatory=True):
    # Don't let this spread across the whole module
    try:
        return cfg.opts("node", "src.root.dir", mandatory=mandatory)
    except exceptions.ConfigError:
        raise exceptions.SystemSetupError(
            "You cannot benchmark Elasticsearch from sources. Did you install Gradle? Please install"
            " all prerequisites and reconfigure Rally with %s configure" % PROGRAM_NAME
        )


def _prune(root_path, max_age_days):
    """
    Removes files that are older than ``max_age_days`` from ``root_path``. Subdirectories are not traversed.

    :param root_path: A directory which should be checked.
    :param max_age_days: Files that have been created more than ``max_age_days`` ago are deleted.
    """
    logger = logging.getLogger(__name__)
    if not os.path.exists(root_path):
        logger.info("[%s] does not exist. Skipping pruning.", root_path)
        return

    for f in os.listdir(root_path):
        artifact = os.path.join(root_path, f)
        if os.path.isfile(artifact):
            max_age = datetime.datetime.now() - datetime.timedelta(days=max_age_days)
            try:
                created_at = datetime.datetime.fromtimestamp(os.lstat(artifact).st_ctime)
                if created_at < max_age:
                    logger.info("Deleting [%s] from artifact cache (reached max age).", f)
                    os.remove(artifact)
                else:
                    logger.debug("Keeping [%s] (max age not yet reached)", f)
            except OSError:
                logger.exception("Could not check whether [%s] needs to be deleted from artifact cache.", artifact)
        else:
            logger.info("Skipping [%s] (not a file).", artifact)


class TemplateRenderer:
    def __init__(self, version, os_name=None, arch=None):
        self.version = version
        if os_name is not None:
            self.os = os_name
        else:
            self.os = sysstats.os_name().lower()
        if arch is not None:
            self.arch = arch
        else:
            derived_arch = sysstats.cpu_arch().lower()
            # Elasticsearch artifacts for Apple Silicon use "aarch64" as the CPU architecture
            self.arch = "aarch64" if derived_arch == "arm64" else derived_arch

    def render(self, template):
        substitutions = {"{{VERSION}}": self.version, "{{OSNAME}}": self.os, "{{ARCH}}": self.arch}
        r = template
        for key, replacement in substitutions.items():
            r = r.replace(key, str(replacement))
        return r


class CompositeSupplier:
    def __init__(self, suppliers):
        self.suppliers = suppliers

    def __call__(self, *args, **kwargs):
        binaries = {}
        for supplier in self.suppliers:
            supplier.fetch()
        for supplier in self.suppliers:
            supplier.prepare()
        for supplier in self.suppliers:
            supplier.add(binaries)
        return binaries


class ElasticsearchFileNameResolver:
    def __init__(self, distribution_config, template_renderer):
        self.cfg = distribution_config
        self.runtime_jdk_bundled = convert.to_bool(self.cfg.get("runtime.jdk.bundled", False))
        self.template_renderer = template_renderer

    @property
    def revision(self):
        return self.template_renderer.version

    @revision.setter
    def revision(self, revision):
        self.template_renderer.version = revision

    @property
    def file_name(self):
        if self.runtime_jdk_bundled:
            url_key = "jdk.bundled.release_url"
        else:
            url_key = "jdk.unbundled.release_url"
        url = self.template_renderer.render(self.cfg[url_key])
        return url[url.rfind("/") + 1 :]

    @property
    def artifact_key(self):
        return "elasticsearch"

    def to_artifact_path(self, file_system_path):
        return file_system_path

    def to_file_system_path(self, artifact_path):
        return artifact_path


class CachedSourceSupplier:
    def __init__(self, distributions_root, source_supplier, file_resolver):
        self.distributions_root = distributions_root
        self.source_supplier = source_supplier
        self.file_resolver = file_resolver
        self.cached_path = None
        self.logger = logging.getLogger(__name__)

    @property
    def file_name(self):
        return self.file_resolver.file_name

    @property
    def cached(self):
        return self.cached_path is not None and os.path.exists(self.cached_path)

    def fetch(self):
        # Can we already resolve the artifact without fetching the source tree at all? This is the case when a specific
        # revision (instead of a meta-revision like "current") is provided and the artifact is already cached. This is
        # also needed if an external process pushes artifacts to Rally's cache which might have been built from a
        # fork. In that case the provided commit hash would not be present in any case in the main ES repo.
        maybe_an_artifact = os.path.join(self.distributions_root, self.file_name)
        if os.path.exists(maybe_an_artifact):
            self.cached_path = maybe_an_artifact
        else:
            resolved_revision = self.source_supplier.fetch()
            if resolved_revision:
                # ensure we use the resolved revision for rendering the artifact
                self.file_resolver.revision = resolved_revision
                self.cached_path = os.path.join(self.distributions_root, self.file_name)

    def prepare(self):
        if not self.cached:
            self.source_supplier.prepare()

    def add(self, binaries):
        if self.cached:
            self.logger.info("Using cached artifact in [%s]", self.cached_path)
            binaries[self.file_resolver.artifact_key] = self.file_resolver.to_artifact_path(self.cached_path)
        else:
            self.source_supplier.add(binaries)
            original_path = self.file_resolver.to_file_system_path(binaries[self.file_resolver.artifact_key])
            # this can be None if the Elasticsearch does not reside in a git repo and the user has only
            # copied all source files. In that case, we cannot resolve a revision hash and thus we cannot cache.
            if self.cached_path:
                try:
                    io.ensure_dir(io.dirname(self.cached_path))
                    shutil.copy(original_path, self.cached_path)
                    self.logger.info("Caching artifact in [%s]", self.cached_path)
                    binaries[self.file_resolver.artifact_key] = self.file_resolver.to_artifact_path(self.cached_path)
                except OSError:
                    self.logger.exception("Not caching [%s].", original_path)
            else:
                self.logger.info("Not caching [%s] (no revision info).", original_path)


class ElasticsearchSourceSupplier:
    def __init__(self, revision, es_src_dir, remote_url, car, builder, template_renderer):
        self.logger = logging.getLogger(__name__)
        self.revision = revision
        self.src_dir = es_src_dir
        self.remote_url = remote_url
        self.car = car
        self.builder = builder
        self.template_renderer = template_renderer

    def fetch(self):
        return SourceRepository("Elasticsearch", self.remote_url, self.src_dir, branch=DEFAULT_ELASTICSEARCH_BRANCH).fetch(self.revision)

    def prepare(self):
        if self.builder:
            self.builder.build_jdk = self.resolve_build_jdk_major(self.src_dir)

            # There are no 'x86_64' specific gradle build commands
            if self.template_renderer.arch != "x86_64":
                commands = [
                    self.template_renderer.render(self.car.mandatory_var("clean_command")),
                    self.template_renderer.render(self.car.mandatory_var("system.build_command.arch")),
                ]
            else:
                commands = [
                    self.template_renderer.render(self.car.mandatory_var("clean_command")),
                    self.template_renderer.render(self.car.mandatory_var("system.build_command")),
                ]

            self.builder.build(commands)

    def add(self, binaries):
        binaries["elasticsearch"] = self.resolve_binary()

    @classmethod
    def resolve_build_jdk_major(cls, src_dir: str) -> int:
        """
        Parses the build JDK major release version from the Elasticsearch repo
        :param src_dir: The source directory for the Elasticsearch repository
        :return: The build JDK major release version
        """
        logger = logging.getLogger(__name__)
        # This .properties file defines the versions of Java with which to
        # build and test Elasticsearch for this branch. Valid Java versions
        # are 'java' or 'openjdk' followed by the major release number.
        path = os.path.join(src_dir, ".ci", "java-versions.properties")
        major_version = None
        try:
            with open(path, encoding="UTF-8") as f:
                for line in f.readlines():
                    if "ES_BUILD_JAVA" in line:
                        java_version = line.split("=")[1].lstrip().rstrip("\n")
                        # e.g. java11
                        if "java" in java_version:
                            major_version = java_version.split("java")[1]
                        # e.g. openjdk16
                        elif "openjdk" in java_version:
                            major_version = java_version.split("openjdk")[1]
                        break
        except FileNotFoundError:
            msg = f"File [{path}] not found."
            console.warn(f"{msg}")
            logger.warning("%s", msg)

        if major_version:
            logger.info("Setting build JDK major release version to [%s].", major_version)
        else:
            major_version = 17
            logger.info("Unable to resolve build JDK major release version. Defaulting to version [%s].", major_version)
        return int(major_version)  # type: ignore[arg-type]  # TODO remove this ignore when introducing type hints

    def resolve_binary(self):
        try:
            # There are no 'x86_64' specific gradle build commands,
            if self.template_renderer.arch != "x86_64":
                system_artifact_path = self.car.mandatory_var("system.artifact_path_pattern.arch")
            else:
                system_artifact_path = self.car.mandatory_var("system.artifact_path_pattern")
            path = os.path.join(self.src_dir, self.template_renderer.render(system_artifact_path))
            return glob.glob(path)[0]
        except IndexError:
            raise SystemSetupError("Couldn't find a tar.gz distribution. Please run Rally with the pipeline 'from-sources'.")


class PluginFileNameResolver:
    def __init__(self, plugin_name, revision=None):
        self.plugin_name = plugin_name
        self.revision = revision

    @property
    def file_name(self):
        return f"{self.plugin_name}-{self.revision}.zip"

    @property
    def artifact_key(self):
        return self.plugin_name

    def to_artifact_path(self, file_system_path):
        return f"file://{file_system_path}"

    def to_file_system_path(self, artifact_path):
        return artifact_path[len("file://") :]


class ExternalPluginSourceSupplier:
    def __init__(self, plugin, revision, src_dir, src_config, builder):
        assert not plugin.core_plugin, "Plugin %s is a core plugin" % plugin.name
        self.plugin = plugin
        self.revision = revision
        # may be None if and only if the user has set an absolute plugin directory
        self.src_dir = src_dir
        self.src_config = src_config
        self.builder = builder
        subdir_cfg_key = "plugin.%s.src.subdir" % self.plugin.name
        dir_cfg_key = "plugin.%s.src.dir" % self.plugin.name
        if dir_cfg_key in self.src_config and subdir_cfg_key in self.src_config:
            raise exceptions.SystemSetupError("Can only specify one of %s and %s but both are set." % (dir_cfg_key, subdir_cfg_key))

        if dir_cfg_key in self.src_config:
            self.plugin_src_dir = _config_value(self.src_config, dir_cfg_key)
            # we must build directly in the plugin dir, not relative to Elasticsearch
            self.override_build_dir = self.plugin_src_dir
        elif subdir_cfg_key in self.src_config:
            self.plugin_src_dir = os.path.join(self.src_dir, _config_value(self.src_config, subdir_cfg_key))
            self.override_build_dir = None
        else:
            raise exceptions.SystemSetupError("Neither %s nor %s are set for plugin %s." % (dir_cfg_key, subdir_cfg_key, self.plugin.name))

    @staticmethod
    def can_handle(plugin):
        return not plugin.core_plugin

    def fetch(self):
        # optional (but then source code is assumed to be available locally)
        plugin_remote_url = self.src_config.get("plugin.%s.remote.repo.url" % self.plugin.name)
        return SourceRepository(self.plugin.name, plugin_remote_url, self.plugin_src_dir, branch="master").fetch(self.revision)

    def prepare(self):
        if self.builder:
            command = _config_value(self.src_config, f"plugin.{self.plugin.name}.build.command")
            build_cmd = f"export JAVA_HOME={self.builder.java_home}; cd {self.override_build_dir}; {command}"
            self.builder.build([build_cmd])

    def add(self, binaries):
        binaries[self.plugin.name] = self.resolve_binary()

    def resolve_binary(self):
        artifact_path = _config_value(self.src_config, "plugin.%s.build.artifact.subdir" % self.plugin.name)
        try:
            name = glob.glob("%s/%s/*.zip" % (self.plugin_src_dir, artifact_path))[0]
            return "file://%s" % name
        except IndexError:
            raise SystemSetupError(
                "Couldn't find a plugin zip file for [%s]. Please run Rally with the pipeline 'from-sources'." % self.plugin.name
            )


class CorePluginSourceSupplier:
    def __init__(self, plugin, es_src_dir, builder):
        assert plugin.core_plugin, "Plugin %s is not a core plugin" % plugin.name
        self.plugin = plugin
        self.es_src_dir = es_src_dir
        self.builder = builder

    @staticmethod
    def can_handle(plugin):
        return plugin.core_plugin

    def fetch(self):
        # Just retrieve the current revision *number* and assume that Elasticsearch has prepared the source tree.
        return SourceRepository("Elasticsearch", None, self.es_src_dir, branch=DEFAULT_PLUGIN_BRANCH).fetch(revision="current")

    def prepare(self):
        if self.builder:
            self.builder.build_jdk = ElasticsearchSourceSupplier.resolve_build_jdk_major(self.es_src_dir)
            self.builder.build([f"./gradlew :plugins:{self.plugin.name}:assemble"])

    def add(self, binaries):
        binaries[self.plugin.name] = self.resolve_binary()

    def resolve_binary(self):
        try:
            name = glob.glob("%s/plugins/%s/build/distributions/*.zip" % (self.es_src_dir, self.plugin.name))[0]
            return "file://%s" % name
        except IndexError:
            raise SystemSetupError(
                "Couldn't find a plugin zip file for [%s]. Please run Rally with the pipeline 'from-sources'." % self.plugin.name
            )


class ElasticsearchDistributionSupplier:
    def __init__(self, repo, version, distributions_root):
        self.repo = repo
        self.version = version
        self.distributions_root = distributions_root
        # will be defined in the prepare phase
        self.distribution_path = None
        self.logger = logging.getLogger(__name__)

    def fetch(self):
        io.ensure_dir(self.distributions_root)
        download_url = net.add_url_param_elastic_no_kpi(self.repo.download_url)
        distribution_path = os.path.join(self.distributions_root, self.repo.file_name)
        self.logger.info("Resolved download URL [%s] for version [%s]", download_url, self.version)
        if not os.path.isfile(distribution_path) or not self.repo.cache:
            try:
                self.logger.info("Starting download of Elasticsearch [%s]", self.version)
                progress = net.Progress("[INFO] Downloading Elasticsearch %s" % self.version)
                net.download(download_url, distribution_path, progress_indicator=progress)
                progress.finish()
                self.logger.info("Successfully downloaded Elasticsearch [%s].", self.version)
            except urllib.error.HTTPError:
                self.logger.exception("Cannot download Elasticsearch distribution for version [%s] from [%s].", self.version, download_url)
                raise exceptions.SystemSetupError(
                    "Cannot download Elasticsearch distribution from [%s]. Please check that the specified "
                    "version [%s] is correct." % (download_url, self.version)
                )
        else:
            self.logger.info("Skipping download for version [%s]. Found an existing binary at [%s].", self.version, distribution_path)

        self.distribution_path = distribution_path

    def prepare(self):
        pass

    def add(self, binaries):
        binaries["elasticsearch"] = self.distribution_path


class PluginDistributionSupplier:
    def __init__(self, repo, plugin):
        self.repo = repo
        self.plugin = plugin

    def fetch(self):
        pass

    def prepare(self):
        pass

    def add(self, binaries):
        # if we have multiple plugin configurations for a plugin we will override entries here but as this is always the same
        # key-value pair this is ok.
        plugin_url = self.repo.plugin_download_url(self.plugin.name)
        if plugin_url:
            binaries[self.plugin.name] = plugin_url


def _config_value(src_config, key):
    try:
        return src_config[key]
    except KeyError:
        raise exceptions.SystemSetupError(
            "Mandatory config key [%s] is undefined. Please add it in the [source] section of the config file." % key
        )


def _extract_revisions(revision):
    revisions = revision.split(",") if revision else []
    if len(revisions) == 1:
        c, r = _component_from_revision(revisions[0])
        if c.startswith("elasticsearch:"):
            r = r[len("elasticsearch:") :]
        # elasticsearch or single plugin
        if c:
            return {c: r}
        else:
            return {
                "elasticsearch": r,
                # use a catch-all value
                "all": r,
            }
    else:
        results = {}
        for rev in revisions:
            c, r = _component_from_revision(rev)
            if c:
                results[c] = r
            else:
                raise exceptions.SystemSetupError("Revision [%s] does not match expected format [name:revision]." % r)
        return results


def _branch_from_revision_with_ts(revision, default_branch):
    """
    Extracts the branch and revision from a `revision` that uses @timestamp.
    If a branch can't be found in `revision`, default_branch is used.
    """

    # ":" separator maybe used in both the timestamp and the component
    # e.g. elasticsearch:<branch>@TS
    _, r = _component_from_revision(revision)
    branch, git_ts_revision = r.split("@")
    if not branch:
        branch = default_branch
    return branch, git_ts_revision


def _component_from_revision(revision):
    """Extracts the (optional) component and remaining data from `revision`"""

    component = ""
    r = revision
    if "@" not in revision and ":" in revision:
        # e.g. @2023-04-20T01:00:00Z
        component, r = revision.split(":")
    elif "@" in revision and ":" in revision:
        # e.g. "elasticsearch:<optional_branch>@2023-04-20T01:00:00Z"
        revision_without_ts = revision[: revision.find("@")]
        if ":" in revision_without_ts:
            component = revision_without_ts.split(":")[0]
            r = revision[revision.find(":", 1) + 1 :]
    return component, r


class SourceRepository:
    """
    Supplier fetches the benchmark candidate source tree from the remote repository.
    """

    def __init__(self, name, remote_url, src_dir, *, branch):
        self.name = name
        self.remote_url = remote_url
        self.src_dir = src_dir
        self.branch = branch
        self.logger = logging.getLogger(__name__)

    def fetch(self, revision):
        # if and only if we want to benchmark the current revision, Rally may skip repo initialization (if it is already present)
        self._try_init(may_skip_init=revision == "current")
        return self._update(revision)

    def has_remote(self):
        return self.remote_url is not None

    def _try_init(self, may_skip_init=False):
        if not git.is_working_copy(self.src_dir):
            if self.has_remote():
                self.logger.info("Downloading sources for %s from %s to %s.", self.name, self.remote_url, self.src_dir)
                git.clone(self.src_dir, remote=self.remote_url)
            elif os.path.isdir(self.src_dir) and may_skip_init:
                self.logger.info("Skipping repository initialization for %s.", self.name)
            else:
                raise exceptions.SystemSetupError("A remote repository URL is mandatory for %s" % self.name)

    def _update(self, revision):
        if self.has_remote() and revision == "latest":
            self.logger.info("Fetching latest sources for %s from origin.", self.name)
            git.pull(self.src_dir, remote="origin", branch=self.branch)
        elif revision == "current":
            self.logger.info("Skip fetching sources for %s.", self.name)
        # revision contains a timestamp
        elif self.has_remote() and "@" in revision:
            branch, git_ts_revision = _branch_from_revision_with_ts(revision, self.branch)
            self.logger.info(
                "Fetching from remote and checking out revision with timestamp [%s] from branch %s for %s.",
                git_ts_revision,
                branch,
                self.name,
            )
            git.pull_ts(self.src_dir, git_ts_revision, remote="origin", branch=branch, default_branch=DEFAULT_ELASTICSEARCH_BRANCH)
        elif self.has_remote():  # we can have either a commit hash, branch name, or tag
            git.fetch(self.src_dir, remote="origin")
            if git.is_branch(self.src_dir, identifier=revision):
                self.logger.info("Fetching from remote and checking out branch [%s] for %s.", revision, self.name)
                git.checkout_branch(self.src_dir, remote="origin", branch=revision)
            else:  # tag or commit hash
                self.logger.info("Fetching from remote and checking out revision [%s] for %s.", revision, self.name)
                git.checkout_revision(self.src_dir, revision=revision)
        else:
            self.logger.info("Checking out local revision [%s] for %s.", revision, self.name)
            git.checkout(self.src_dir, branch=revision)
        if git.is_working_copy(self.src_dir):
            git_revision = git.head_revision(self.src_dir)
            self.logger.info("User-specified revision [%s] for [%s] results in git revision [%s]", revision, self.name, git_revision)
            return git_revision
        else:
            self.logger.info("Skipping git revision resolution for %s (%s is not a git repository).", self.name, self.src_dir)
            return None

    @classmethod
    def is_commit_hash(cls, revision):
        return revision != "latest" and revision != "current" and not revision.startswith("@")


class Builder:
    """
    A builder is responsible for creating an installable binary from the source files.

    It is not intended to be used directly but should be triggered by its mechanic.
    """

    def __init__(self, src_dir, build_jdk=None, log_dir=None):
        self.src_dir = src_dir
        self.build_jdk = build_jdk
        self._java_home = None
        self.log_dir = log_dir
        self.logger = logging.getLogger(__name__)

    @property
    def java_home(self):
        if not self._java_home:
            _, self._java_home = jvm.resolve_path(self.build_jdk)
        return self._java_home

    def build(self, commands, override_src_dir=None):
        for command in commands:
            self.run(command, override_src_dir)

    def run(self, command, override_src_dir=None):
        src_dir = self.src_dir if override_src_dir is None else override_src_dir

        io.ensure_dir(self.log_dir)
        log_file = os.path.join(self.log_dir, "build.log")

        # we capture all output to a dedicated build log file
        build_cmd = f"export JAVA_HOME={self.java_home}; cd {src_dir}; {command} >> {log_file} 2>&1"
        console.info("Creating installable binary from source files")
        self.logger.info("Running build command [%s]", build_cmd)

        if process.run_subprocess(build_cmd) != 0:
            msg = f"Executing '{command}' failed. The last 20 lines in the build log file are:\n"
            msg += "=========================================================================================================\n"
            with open(log_file, encoding="utf-8") as f:
                msg += "\t"
                msg += "\t".join(f.readlines()[-20:])
            msg += "=========================================================================================================\n"
            msg += f"The full build log is available at [{log_file}]."

            raise BuildError(msg)


class DockerBuilder:
    def __init__(self, src_dir, build_jdk=None, log_dir=None, client=None):
        self.client = client
        self.src_dir = src_dir
        self.build_jdk = build_jdk
        self.log_dir = log_dir
        io.ensure_dir(self.log_dir)
        self.log_file = os.path.join(self.log_dir, "build.log")
        self.logger = logging.getLogger(__name__)

        try:
            self.logger.info(self.client.version())
        except docker.errors.APIError as e:
            raise exceptions.SystemSetupError(self.err_msg(e))

        self.user_id = os.geteuid()
        self.user_name = getpass.getuser()
        self.group_id = os.getgid()
        try:
            # we need to get the user's group id to add it to our container image for permissions on bind mount
            self.group_name = grp.getgrgid(self.group_id)[0]
        except KeyError:
            raise SystemSetupError("Failed to retrieve current user's group name required for Docker source build method.")

        self.build_container_name = "esrally-source-builder"
        self.image_name = f"{self.build_container_name}-image"
        self.image_builder_container_name = f"{self.build_container_name}-image-builder"

    @staticmethod
    def err_msg(err):
        return (
            f"Error communicating with Docker daemon: [{err}]. Please ensure Docker is installed and your user has the correct permissions."
        )

    def resolve_jdk_build_container_image(self, build_jdk: int) -> str:
        """
        Given a JDK major release version, find a suitable docker image to use for compiling ES & ES plugins from sources
        :param build_jdk: The build JDK major version required to build Elasticsearch and plugins.
        :return: The Docker image name and respective tag that satisfies the 'build_jdk' requirement.
        """
        logger = logging.getLogger(__name__)
        # Temurin only maintains images for JDK 11, 17, 18 (at time of writing)
        # older builds of ES may require specifics like JDK 14 etc, in which case we rely on the OpenJDK images
        #
        # We modify the base image (create user/group, install git), so we need to ensure the conatiner's base image
        # is Debian based (apt required)
        if build_jdk >= 17:
            docker_image_name = f"eclipse-temurin:{build_jdk}-jdk-jammy"
            logger.info("Build JDK version [%s] is >= 17, using Docker image [%s].", build_jdk, docker_image_name)
        # There is no Debian based OpenJDK JDK 12 image
        elif build_jdk == 12:
            docker_image_name = "adoptopenjdk/openjdk12:debian"
        else:
            docker_image_name = f"openjdk:{build_jdk}-jdk-buster"
            logger.info("Build JDK version [%s] is < 17, using Docker image [%s].", build_jdk, docker_image_name)
        return docker_image_name

    def stop_containers(self, container_names):
        for c in container_names:
            try:
                existing_container = self.client.containers.get(c)
                existing_container.stop()
                existing_container.remove()
            except docker.errors.NotFound:
                pass
            except docker.errors.APIError as e:
                raise exceptions.SystemSetupError(self.err_msg(e))

    def tail_container_logs(self, output, container_name):
        try:
            with open(self.log_file, "a+", encoding="utf-8") as f:
                while True:
                    line = next(output).decode("utf-8")
                    f.write(line)
                    f.flush()
        except StopIteration:
            self.logger.info("Log stream ended for [%s]", container_name)

    def check_container_return_code(self, completion, container_name):
        if completion["StatusCode"] != 0:
            msg = f"Executing '{container_name}' failed. The last 20 lines in the build.log file are:\n"
            msg += "=========================================================================================================\n"
            with open(self.log_file, encoding="utf-8") as f:
                msg += "\t"
                msg += "\t".join(f.readlines()[-20:])
            msg += "=========================================================================================================\n"
            msg += f"The full build log is available at [{self.log_file}]"
            raise BuildError(
                f"Docker container [{container_name}] failed with status code [{completion['StatusCode']}]: "
                f"Error [{completion['Error']}]: Build log output [{msg}]"
            )
        self.logger.info("Container [%s] completed successfully.", container_name)

    def create_base_container_image(self):
        try:
            # create a new image with the required users & git installed
            image_container = self.client.containers.run(
                detach=True,
                name=self.image_builder_container_name,
                image=self.resolve_jdk_build_container_image(self.build_jdk),
                command=(
                    # {1..5} is for retrying apt update/install in case of transient network issues
                    '/bin/bash -c "for i in {1..5}; do apt update -y; apt install git -y && break || sleep 15; done; '
                    f"useradd --create-home -u {self.user_id} {self.user_name}; "
                    f"groupadd -g {self.group_id} {self.group_name}; "
                    f'usermod -g {self.group_name} {self.user_name}"'
                ),
            )

            output = image_container.logs(stream=True)
            self.tail_container_logs(output, self.image_builder_container_name)

            # wait for container to complete
            completion = image_container.wait()
            self.check_container_return_code(completion, self.image_builder_container_name)

            # create the image (i.e. docker commit)
            image = image_container.commit(self.image_name)

            return image

        except docker.errors.APIError as e:
            raise exceptions.SystemSetupError(self.err_msg(e))

    def build(self, commands):
        build_command = "; ".join(commands)
        self.run(build_command)

    def run(self, command):
        # stop & remove any pre-existing containers
        self.stop_containers([self.build_container_name, self.image_builder_container_name])
        # build our new base image
        container_image = self.create_base_container_image()
        try:
            console.info("Using Docker to create installable binary from source files")
            container = self.client.containers.run(
                detach=True,
                name=self.build_container_name,
                image=container_image.id,
                user=self.user_name,
                group_add=[self.group_id],
                command=f"/bin/bash -c \"git config --global --add safe.directory '*'; {command}\"",
                volumes=[f"{self.src_dir}:/home/{self.user_name}/elasticsearch"],
                working_dir=f"/home/{self.user_name}/elasticsearch",
            )

            output = container.logs(stream=True)
            self.tail_container_logs(output, self.build_container_name)

            # wait for container to complete
            completion = container.wait()
            self.check_container_return_code(completion, self.build_container_name)

        except docker.errors.APIError as e:
            raise exceptions.SystemSetupError(self.err_msg(e))


class DistributionRepository:
    def __init__(self, name, distribution_config, template_renderer):
        self.name = name
        self.cfg = distribution_config
        self.runtime_jdk_bundled = convert.to_bool(self.cfg.get("runtime.jdk.bundled", False))
        self.template_renderer = template_renderer

    @property
    def download_url(self):
        # team repo
        if self.runtime_jdk_bundled:
            default_key = f"jdk.bundled.{self.name}_url"
        else:
            default_key = f"jdk.unbundled.{self.name}_url"
        # rally.ini
        override_key = f"{self.name}.url"
        return self._url_for(override_key, default_key)

    @property
    def file_name(self):
        url = self.download_url
        return url[url.rfind("/") + 1 :]

    def plugin_download_url(self, plugin_name):
        # team repo
        default_key = f"plugin_{plugin_name}_{self.name}_url"
        # rally.ini
        override_key = f"plugin.{plugin_name}.{self.name}.url"
        return self._url_for(override_key, default_key, mandatory=False)

    def _url_for(self, user_defined_key, default_key, mandatory=True):
        try:
            if user_defined_key in self.cfg:
                url_template = self.cfg[user_defined_key]
            else:
                url_template = self.cfg[default_key]
        except KeyError:
            if mandatory:
                raise exceptions.SystemSetupError(f"Neither config key [{user_defined_key}] nor [{default_key}] is defined.")
            return None
        return self.template_renderer.render(url_template)

    @property
    def cache(self):
        k = f"{self.name}.cache"
        try:
            raw_value = self.cfg[k]
        except KeyError:
            raise exceptions.SystemSetupError("Mandatory config key [%s] is undefined." % k)
        try:
            return convert.to_bool(raw_value)
        except ValueError:
            raise exceptions.SystemSetupError("Value [%s] for config key [%s] is not a valid boolean value." % (raw_value, k))
