# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# 	http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import json
import logging
import logging.config
import os
import time
import typing

import ecs_logging

from esrally import paths
from esrally.utils import collections, io


# pylint: disable=unused-argument
def configure_utc_formatter(*args: typing.Any, **kwargs: typing.Any) -> logging.Formatter:
    """
    Logging formatter that renders timestamps UTC, or in the local system time zone when the user requests it.
    """
    formatter = logging.Formatter(fmt=kwargs["format"], datefmt=kwargs["datefmt"])
    user_tz = kwargs.get("timezone", None)
    if user_tz == "localtime":
        formatter.converter = time.localtime
    else:
        formatter.converter = time.gmtime

    return formatter


MutatorType = typing.Callable[[logging.LogRecord, dict[str, typing.Any]], None]


class RallyEcsFormatter(ecs_logging.StdlibFormatter):
    def __init__(
        self,
        *args: typing.Any,
        mutators: typing.Optional[list[MutatorType]] = None,
        **kwargs: typing.Any,
    ):
        super().__init__(*args, **kwargs)
        self.mutators = mutators or []

    def format_to_ecs(self, record: logging.LogRecord) -> dict[str, typing.Any]:
        log_dict = super().format_to_ecs(record)
        self.apply_mutators(record, log_dict)
        return log_dict

    def apply_mutators(self, record: logging.LogRecord, log_dict: dict[str, typing.Any]) -> None:
        for mutator in self.mutators:
            mutator(record, log_dict)


def rename_actor_fields(record: logging.LogRecord, log_dict: dict[str, typing.Any]) -> None:
    fields = {}
    if log_dict.get("actorAddress"):
        fields["address"] = log_dict.pop("actorAddress")
    if fields:
        collections.deep_update(log_dict, {"rally": {"thespian": fields}})


# Special case for asyncio fields as they are not part of the standard ECS log dict
def rename_async_fields(record: logging.LogRecord, log_dict: dict[str, typing.Any]) -> None:
    fields = {}
    if hasattr(record, "taskName") and record.taskName is not None:
        fields["task"] = record.taskName
    if fields:
        collections.deep_update(log_dict, {"python": {"asyncio": fields}})


def configure_ecs_formatter(*args: typing.Any, **kwargs: typing.Any) -> ecs_logging.StdlibFormatter:
    """
    ECS Logging formatter
    """
    fmt = kwargs.pop("format", None)
    configurator = logging.config.BaseConfigurator({})
    mutators = kwargs.pop("mutators", [rename_actor_fields, rename_async_fields])
    mutators = [fn if callable(fn) else configurator.resolve(fn) for fn in mutators]

    formatter = RallyEcsFormatter(fmt=fmt, mutators=mutators, *args, **kwargs)
    return formatter


def log_config_path():
    """
    :return: The absolute path to Rally's log configuration file.
    """
    return os.path.join(paths.rally_confdir(), "logging.json")


def add_missing_loggers_to_config():
    """
    Ensures that any missing top level loggers in resources/logging.json are
    appended to an existing log configuration
    """

    def _missing_loggers(source, target):
        """
        Returns any top-level loggers present in 'source', but not in 'target'
        :return: A dict of all loggers present in 'source', but not in 'target'
        """
        missing_loggers = {}
        for logger in source:
            if logger in source and logger in target:
                continue
            else:
                missing_loggers[logger] = source[logger]
        return missing_loggers

    source_path = io.normalize_path(os.path.join(os.path.dirname(__file__), "resources", "logging.json"))

    with open(log_config_path(), encoding="UTF-8") as target:
        with open(source_path, encoding="UTF-8") as src:
            template = json.load(src)
            existing_logging_config = json.load(target)
            if missing_loggers := _missing_loggers(source=template["loggers"], target=existing_logging_config["loggers"]):
                existing_logging_config["loggers"].update(missing_loggers)
                updated_config = json.dumps(existing_logging_config, indent=2)

    if missing_loggers:
        with open(log_config_path(), "w", encoding="UTF-8") as target:
            target.write(updated_config)


def install_default_log_config():
    """
    Ensures a log configuration file is present on this machine. The default
    log configuration is based on the template in resources/logging.json.

    It also ensures that the default log path has been created so log files
    can be successfully opened in that directory.
    """
    log_config = log_config_path()
    if not io.exists(log_config):
        io.ensure_dir(io.dirname(log_config))
        source_path = io.normalize_path(os.path.join(os.path.dirname(__file__), "resources", "logging.json"))
        with open(log_config, "w", encoding="UTF-8") as target:
            with open(source_path, encoding="UTF-8") as src:
                contents = src.read()
                target.write(contents)
    add_missing_loggers_to_config()
    io.ensure_dir(paths.logs())


# pylint: disable=unused-argument
def configure_file_handler(*args, **kwargs) -> logging.Handler:
    """
    Configures the WatchedFileHandler supporting expansion of `~` and `${LOG_PATH}` to the user's home and the log path respectively.
    """
    filename = kwargs.pop("filename").replace("${LOG_PATH}", paths.logs())
    return logging.handlers.WatchedFileHandler(filename=filename, encoding=kwargs["encoding"], delay=kwargs.get("delay", False))


def configure_profile_file_handler(*args, **kwargs) -> logging.Handler:
    """
    Configures the FileHandler supporting expansion of `~` and `${LOG_PATH}` to the user's home and the log path respectively.
    """
    filename = kwargs.pop("filename").replace("${LOG_PATH}", paths.logs())
    return logging.FileHandler(filename=filename, encoding=kwargs["encoding"], delay=kwargs.get("delay", False))


def load_configuration():
    """
    Loads the logging configuration. This is a low-level method and usually
    `configure_logging()` should be used instead.

    :return: The logging configuration as `dict` instance.
    """
    with open(log_config_path()) as f:
        return json.load(f)


def post_configure_actor_logging():
    """
    Reconfigures all loggers in actor processes.

    See https://groups.google.com/forum/#!topic/thespianpy/FntU9umtvhc for the rationale.
    """
    # see configure_logging()
    logging.captureWarnings(True)

    # at this point we can assume that a log configuration exists. It has been created already during startup.
    logger_configuration = load_configuration()
    if "root" in logger_configuration and "level" in logger_configuration["root"]:
        root_logger = logging.getLogger()
        root_logger.setLevel(logger_configuration["root"]["level"])

    if "loggers" in logger_configuration:
        for lgr, cfg in load_configuration()["loggers"].items():
            if "level" in cfg:
                logging.getLogger(lgr).setLevel(cfg["level"])


def configure_logging():
    """
    Configures logging for the current process.
    """

    logging.config.dictConfig(load_configuration())

    # Avoid failures such as the following (shortened a bit):
    #
    # ---------------------------------------------------------------------------------------------
    # "esrally/driver/driver.py", line 220, in create_client
    # "thespian-3.8.0-py3.5.egg/thespian/actors.py", line 187, in createActor
    # [...]
    # "thespian-3.8.0-py3.5.egg/thespian/system/multiprocCommon.py", line 348, in _startChildActor
    # "python3.5/multiprocessing/process.py", line 105, in start
    # "python3.5/multiprocessing/context.py", line 267, in _Popen
    # "python3.5/multiprocessing/popen_fork.py", line 18, in __init__
    # sys.stderr.flush()
    #
    # OSError: [Errno 5] Input/output error
    # ---------------------------------------------------------------------------------------------
    #
    # This is caused by urllib3 wanting to send warnings about insecure SSL connections to stderr when we disable them (in client.py) with:
    #
    #   urllib3.disable_warnings()
    #
    # The filtering functionality of the warnings module causes the error above on some systems. If we instead redirect the warning output
    # to our logs instead of stderr (which is the warnings module's default), we can disable warnings safely.
    logging.captureWarnings(True)
