#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2025 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import collections
import contextlib
import logging
import threading
import traceback
import warnings
from copy import deepcopy

from .compat import six

try:
    import contextvars
except ImportError:
    contextvars = None


DEFAULT_BLOCK_BUFFER_SIZE = 20 * 1024**2
DEFAULT_CHUNK_SIZE = 65536
DEFAULT_CONNECT_RETRY_TIMES = 4
DEFAULT_CONNECT_TIMEOUT = 120
DEFAULT_READ_TIMEOUT = 120
DEFAULT_POOL_CONNECTIONS = 10
DEFAULT_POOL_MAXSIZE = 10
DEFAULT_RETRY_DELAY = 0.1
_DEFAULT_REDIRECT_WARN = "Option {source} has been replaced by {target} and might be removed in a future release."


class OptionError(AttributeError):
    pass


class Redirection(object):
    def __init__(self, item, warn=None):
        self._items = item.split(".")
        self._warn = warn
        self._warned = True
        self._parent = None

    def bind(self, attr_dict):
        self._parent = attr_dict
        self.getvalue()
        self._warned = False

    def getvalue(self, silent=False):
        if not silent and self._warn and not self._warned:
            in_completer = any(
                1 for st in traceback.extract_stack() if "completer" in st[0].lower()
            )
            if not in_completer:
                self._warned = True
                warnings.warn(self._warn)
        conf = self._parent.root
        for it in self._items:
            conf = getattr(conf, it)
        return conf

    def setvalue(self, value, silent=False):
        if not silent and self._warn and not self._warned:
            self._warned = True
            warnings.warn(self._warn)
        conf = self._parent.root
        for it in self._items[:-1]:
            conf = getattr(conf, it)
        setattr(conf, self._items[-1], value)


class PandasRedirection(Redirection):
    def __init__(self, *args, **kw):
        super(PandasRedirection, self).__init__(*args, **kw)
        self._val = None
        try:
            import pandas  # noqa: F401

            self._use_pd = True
        except (ImportError, ValueError):
            self._use_pd = False

    def getvalue(self, silent=False):
        if self._use_pd:
            import pandas as pd

            try:
                return pd.get_option(".".join(self._items))
            except (KeyError, LookupError, AttributeError):
                self._use_pd = False
        else:
            return self._val

    def setvalue(self, value, silent=False):
        if self._use_pd:
            import pandas as pd

            key = ".".join(self._items)
            if value != pd.get_option(key):
                pd.set_option(key, value)
        else:
            self._val = value


class AttributeDict(dict):
    def __init__(self, *args, **kwargs):
        self._inited = False
        self._parent = kwargs.pop("_parent", None)
        self._root = None
        super(AttributeDict, self).__init__(*args, **kwargs)
        self._inited = True

    @property
    def root(self):
        if self._root is not None:
            return self._root
        if self._parent is None:
            self._root = self
        else:
            self._root = self._parent.root
        return self._root

    def __getattr__(self, item):
        if item in self:
            val = self[item]
            if isinstance(val, AttributeDict):
                return val
            elif isinstance(val[0], Redirection):
                return val[0].getvalue()
            else:
                return val[0]
        return object.__getattribute__(self, item)

    def __dir__(self):
        return list(six.iterkeys(self))

    def register(self, key, value, validator=None):
        self[key] = value, validator
        if isinstance(value, Redirection):
            value.bind(self)

    def unregister(self, key):
        del self[key]

    def add_validator(self, key, validator):
        value, old_validator = self[key]
        validators = getattr(
            old_validator,
            "validators",
            [old_validator] if callable(old_validator) else [],
        )
        validators.append(validator)
        self[key] = (value, all_validator(*validators))

    def _setattr(self, key, value, silent=False):
        if not silent and key not in self:
            raise OptionError("Cannot identify configuration name '%s'." % str(key))

        if not isinstance(value, AttributeDict):
            validate = None
            if key in self:
                val = self[key]
                validate = self[key][1]
                if validate is not None:
                    if not validate(value):
                        raise ValueError("Cannot set value %s" % value)
                if isinstance(val[0], Redirection):
                    val[0].setvalue(value)
                else:
                    self[key] = value, validate
            else:
                self[key] = value, validate
        else:
            self[key] = value

    def __setattr__(self, key, value):
        if key == "_inited":
            super(AttributeDict, self).__setattr__(key, value)
            return
        try:
            object.__getattribute__(self, key)
            super(AttributeDict, self).__setattr__(key, value)
            return
        except AttributeError:
            pass

        if not self._inited:
            super(AttributeDict, self).__setattr__(key, value)
        else:
            self._setattr(key, value)

    def loads(self, d):
        dispatches = collections.defaultdict(dict)
        for k, v in six.iteritems(d):
            if "." in k:
                sk, rk = k.split(".", 1)
                dispatches[sk][rk] = v
            elif isinstance(self[k][0], Redirection):
                self[k][0].setvalue(v, silent=True)
            else:
                setattr(self, k, v)
        for k, v in six.iteritems(dispatches):
            self[k].loads(v)

    def dumps(self):
        from .accounts import BaseAccount

        result_dict = dict()
        for k, v in six.iteritems(self):
            if isinstance(v, AttributeDict):
                result_dict.update(
                    (k + "." + sk, sv) for sk, sv in six.iteritems(v.dumps())
                )
            elif isinstance(v[0], BaseAccount) or callable(v[0]):
                # ignore accounts in config dumps
                result_dict[k] = None
            elif isinstance(v[0], Redirection):
                result_dict[k] = v[0].getvalue(silent=True)
            else:
                result_dict[k] = v[0]
        return result_dict


class Config(object):
    def __init__(self, config=None):
        self._config = config or AttributeDict()

    def __dir__(self):
        return list(six.iterkeys(self._config))

    def __getattr__(self, item):
        return getattr(self._config, item)

    def __setattr__(self, key, value):
        if key == "_config":
            object.__setattr__(self, key, value)
            return
        setattr(self._config, key, value)

    def register_option(self, option, value, validator=None):
        assert validator is None or callable(validator)
        splits = option.split(".")
        conf = self._config

        for name in splits[:-1]:
            config = conf.get(name)
            if config is None:
                val = AttributeDict(_parent=conf)
                conf[name] = val
                conf = val
            elif not isinstance(config, dict):
                raise AttributeError(
                    "Fail to set option: %s, conflict has encountered" % option
                )
            else:
                conf = config

        key = splits[-1]
        if conf.get(key) is not None:
            raise AttributeError("Fail to set option: %s, option has been set" % option)

        conf.register(key, value, validator)

    def register_pandas(self, option, value=None, validator=None):
        redir = PandasRedirection(option)
        self.register_option(option, redir, validator=validator)
        if value is not None:
            redir.setvalue(value)

    def redirect_option(self, option, target, warn=_DEFAULT_REDIRECT_WARN):
        redir = Redirection(target, warn=warn.format(source=option, target=target))
        self.register_option(option, redir)

    def unregister_option(self, option):
        splits = option.split(".")
        conf = self._config
        for name in splits[:-1]:
            config = conf.get(name)
            if not isinstance(config, dict):
                raise AttributeError(
                    "Fail to unregister option: %s, conflict has encountered" % option
                )
            else:
                conf = config

        key = splits[-1]
        if key not in conf:
            raise AttributeError(
                "Option %s not configured, thus failed to unregister." % option
            )
        conf.unregister(key)

    def update(self, new_config):
        if not isinstance(new_config, dict):
            new_config = new_config._config
        for option, value in new_config.items():
            try:
                self.register_option(option, value)
            except AttributeError:
                setattr(self, option, value)

    def add_validator(self, option, validator):
        splits = option.split(".")
        conf = self._config
        for name in splits[:-1]:
            config = conf.get(name)
            if not isinstance(config, dict):
                raise AttributeError(
                    "Fail to add validator: %s, conflict has encountered" % option
                )
            else:
                conf = config

        key = splits[-1]
        if key not in conf:
            raise AttributeError(
                "Option %s not configured, thus failed to set validator." % option
            )
        conf.add_validator(key, validator)

    def loads(self, d):
        return self._config.loads(d)

    def dumps(self):
        return self._config.dumps()


def is_interactive():
    import __main__ as main

    return not hasattr(main, "__file__")


# validators
def any_validator(*validators):
    def validate(x):
        return any(validator(x) for validator in validators)

    return validate


def all_validator(*validators):
    def validate(x):
        return all(validator(x) for validator in validators)

    validate.validators = validators
    return validate


is_null = lambda x: x is None
is_bool = lambda x: isinstance(x, bool)
is_float = lambda x: isinstance(x, float)
is_integer = lambda x: isinstance(x, six.integer_types)
is_string = lambda x: isinstance(x, six.string_types)
is_dict = lambda x: isinstance(x, dict)
is_list = lambda x: isinstance(x, list)


def is_in(vals):
    def validate(x):
        return x in vals

    return validate


def show_deprecated(msg):
    def validator(x):
        warnings.warn(msg, DeprecationWarning)
        return x

    return validator


def verbose_log_validator(val):
    if options.verbose == val:
        # no flip, return directly
        return True

    logging.basicConfig()
    odps_logger = logging.getLogger("odps")
    odps_logger.propagate = False
    if _stream_handler not in odps_logger.handlers:
        odps_logger.addHandler(_stream_handler)

    if val:
        logging.basicConfig()
        old_effective_level = odps_logger.getEffectiveLevel() or logging.WARNING
        target_level = min(logging.INFO, old_effective_level)

        odps_logger.oldLevel = odps_logger.level
        odps_logger.setLevel(target_level)

        _stream_handler.oldLevel = _stream_handler.level
        _stream_handler.setLevel(target_level)
        odps_logger.addHandler(_verbose_log_handler)
    else:
        old_level = getattr(odps_logger, "oldLevel", logging.NOTSET)
        odps_logger.setLevel(old_level)

        old_level = getattr(_stream_handler, "oldLevel", logging.NOTSET)
        _stream_handler.setLevel(old_level)
        odps_logger.removeHandler(_verbose_log_handler)
    return True


class VerboseLogHandler(logging.StreamHandler):
    def emit(self, record):
        try:
            if options.verbose_log:
                msg = self.format(record)
                options.verbose_log(msg)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)


_stream_handler = logging.StreamHandler()
_verbose_log_handler = VerboseLogHandler()


default_options = Config()
default_options.register_option(
    "is_global_account_overwritable", True, validator=is_bool
)
default_options.register_option("account", None)
default_options.register_option("endpoint", None)
default_options.redirect_option("end_point", "endpoint")
default_options.register_option("default_project", None)
default_options.register_option("default_schema", None)
default_options.register_option("default_namespace", None)
default_options.register_option("app_account", None)
default_options.register_option("region_name", None)
default_options.register_option("quota_name", None)
default_options.register_option("local_timezone", None)
default_options.register_option("use_legacy_parsedate", False)
default_options.register_option("allow_antique_date", False)
default_options.register_option(
    "user_agent_pattern",
    "$pyodps_version $python_version $os_version $maxframe_version",
)
default_options.register_option(
    "use_legacy_logview", True, validator=any_validator(is_bool, is_null)
)
default_options.register_option("logview_host", None)
default_options.register_option("logview_hours", 24 * 30, validator=is_integer)
default_options.redirect_option("log_view_host", "logview_host")
default_options.redirect_option("log_view_hours", "logview_hours")
default_options.register_option("api_proxy", None)
default_options.register_option("data_proxy", None)
default_options.redirect_option("tunnel_proxy", "data_proxy")
default_options.register_option("seahawks_url", None)
default_options.register_option("biz_id", None)
default_options.register_option(
    "priority", None, validator=any_validator(is_null, is_integer)
)
default_options.register_option("get_priority", None)
default_options.register_option("temp_lifecycle", 1, validator=is_integer)
default_options.register_option(
    "lifecycle", None, validator=any_validator(is_null, is_integer)
)
default_options.register_option(
    "table_read_limit", None, validator=any_validator(is_null, is_integer)
)
default_options.register_option("completion_size", 10, validator=is_integer)
default_options.register_option(
    "default_task_settings", None, validator=any_validator(is_null, is_dict)
)
default_options.register_option("resource_chunk_size", 64 << 20, validator=is_integer)
default_options.register_option("upload_resource_in_chunks", True, validator=is_bool)
default_options.register_option("verify_ssl", True)
default_options.register_option("enable_schema", False, validator=is_bool)
default_options.redirect_option("always_enable_schema", "enable_schema")
default_options.register_option("table_auto_flush_time", 150, validator=is_integer)
default_options.register_option("struct_as_dict", False, validator=is_bool)
default_options.register_option(
    "struct_as_ordered_dict", None, validator=any_validator(is_bool, is_null)
)
default_options.register_option(
    "map_as_ordered_dict", None, validator=any_validator(is_bool, is_null)
)
default_options.register_option(
    "logview_latency", 10, validator=any_validator(is_float, is_integer)
)
default_options.register_option(
    "progress_time_interval", 5 * 60, validator=any_validator(is_float, is_integer)
)
default_options.register_option("progress_percentage_gap", 5, validator=is_integer)
default_options.register_option("enable_v4_sign", False, validator=is_bool)
default_options.register_option("align_supported_python_tag", True, validator=is_bool)

# c or python mode, use for UT, in other cases, please do not modify the value
default_options.register_option("force_c", False, validator=is_integer)
default_options.register_option("force_py", False, validator=is_integer)

# callbacks for wrappers
default_options.register_option("instance_create_callback", None)
default_options.register_option("tunnel_session_create_callback", None)
default_options.register_option("tunnel_session_create_timeout_callback", None)
default_options.register_option("result_reader_create_callback", None)
default_options.register_option("tunnel_read_timeout_callback", None)
default_options.register_option("skipped_survey_regexes", [])

# network connections
default_options.register_option("chunk_size", DEFAULT_CHUNK_SIZE, validator=is_integer)
default_options.register_option(
    "retry_times", DEFAULT_CONNECT_RETRY_TIMES, validator=is_integer
)
default_options.register_option(
    "retry_delay", DEFAULT_RETRY_DELAY, validator=any_validator(is_integer, is_float)
)
default_options.register_option(
    "connect_timeout", DEFAULT_CONNECT_TIMEOUT, validator=is_integer
)
default_options.register_option(
    "read_timeout", DEFAULT_READ_TIMEOUT, validator=is_integer
)
default_options.register_option(
    "pool_connections", DEFAULT_POOL_CONNECTIONS, validator=is_integer
)
default_options.register_option(
    "pool_maxsize", DEFAULT_POOL_MAXSIZE, validator=is_integer
)

# Tunnel
default_options.register_option("tunnel.endpoint", None)
default_options.register_option("tunnel.string_as_binary", False, validator=is_bool)
default_options.register_option("tunnel.use_instance_tunnel", True, validator=is_bool)
default_options.register_option(
    "tunnel.limit_instance_tunnel", None, validator=any_validator(is_null, is_bool)
)
default_options.register_option(
    "tunnel.legacy_fallback_timeout", None, validator=any_validator(is_null, is_integer)
)
default_options.register_option(
    "tunnel.pd_mem_cache_size", 1024 * 4, validator=is_integer
)
default_options.register_option(
    "tunnel.pd_row_cache_size", 1024 * 16, validator=is_integer
)
default_options.register_option(
    "tunnel.pd_cast_mode", None, validator=is_in([None, "numpy", "arrow"])
)
default_options.register_option(
    "tunnel.read_row_batch_size", 1024, validator=is_integer
)
default_options.register_option(
    "tunnel.write_row_batch_size", 1024, validator=is_integer
)
default_options.register_option(
    "tunnel.batch_merge_threshold", 128, validator=is_integer
)
default_options.register_option(
    "tunnel.overflow_date_as_none", False, validator=is_bool
)
default_options.register_option(
    "tunnel.quota_name", None, validator=any_validator(is_null, is_string)
)
default_options.register_option(
    "tunnel.block_buffer_size", DEFAULT_BLOCK_BUFFER_SIZE, validator=is_integer
)
default_options.register_option(
    "tunnel.use_block_writer_by_default", False, validator=is_bool
)
default_options.register_option(
    "tunnel.tags", None, validator=any_validator(is_null, is_string, is_list)
)
default_options.register_option(
    "tunnel.enable_client_metrics", False, validator=any_validator(is_null, is_bool)
)
default_options.register_option("tunnel.compress.enabled", False, validator=is_bool)
default_options.register_option(
    "tunnel.compress.algo", None, validator=any_validator(is_null, is_string)
)
default_options.register_option("tunnel.compress.level", 1, validator=is_integer)
default_options.register_option("tunnel.compress.strategy", 0, validator=is_integer)

default_options.redirect_option("tunnel_endpoint", "tunnel.endpoint")
default_options.redirect_option("use_instance_tunnel", "tunnel.use_instance_tunnel")
default_options.redirect_option(
    "limited_instance_tunnel", "tunnel.limit_instance_tunnel"
)
default_options.redirect_option(
    "tunnel.limited_instance_tunnel", "tunnel.limit_instance_tunnel"
)

# terminal
default_options.register_option("console.max_lines", None)
default_options.register_option("console.max_width", None)
default_options.register_option("console.use_color", False, validator=is_bool)

# SQL
default_options.register_option(
    "sql.settings", None, validator=any_validator(is_null, is_dict)
)
default_options.register_option("sql.ignore_fields_not_null", False, validator=is_bool)
default_options.register_option(
    "sql.use_odps2_extension", None, validator=any_validator(is_null, is_bool)
)

# sqlalchemy
default_options.register_option(
    "sqlalchemy.project_as_schema", None, validator=any_validator(is_null, is_bool)
)

# DataFrame
default_options.register_option("interactive", is_interactive(), validator=is_bool)
default_options.register_option(
    "verbose", False, validator=all_validator(is_bool, verbose_log_validator)
)
default_options.register_option("verbose_log", None)
default_options.register_option("df.optimize", True, validator=is_bool)
default_options.register_option("df.optimizes.cp", True, validator=is_bool)
default_options.register_option("df.optimizes.pp", True, validator=is_bool)
default_options.register_option("df.optimizes.tunnel", True, validator=is_bool)
default_options.register_option("df.analyze", True, validator=is_bool)
default_options.register_option("df.use_cache", True, validator=is_bool)
default_options.register_option("df.quote", True, validator=is_bool)
default_options.register_option("df.dump_udf", False, validator=is_bool)
default_options.register_option("df.supersede_libraries", True, validator=is_bool)
default_options.register_option("df.libraries", None)
default_options.register_option("df.image", None)
default_options.register_option("df.odps.sort.limit", 10000)
default_options.register_option(
    "df.odps.nan_handler", "py"
)  # None for not handled, builtin for built-in ISNAN function
default_options.register_option(
    "df.sqlalchemy.execution_options", None, validator=any_validator(is_null, is_dict)
)
default_options.register_option("df.seahawks.max_size", 10 * 1024 * 1024 * 1024)  # 10G
default_options.register_option("df.delete_udfs", True, validator=is_bool)
default_options.register_option("df.use_xflow_sample", False, validator=is_bool)
default_options.register_option("df.writer_count_limit", 50, validator=is_integer)

# PyODPS ML
default_options.register_option("ml.xflow_project", "algo_public", validator=is_string)
default_options.register_option(
    "ml.xflow_settings", None, validator=any_validator(is_null, is_dict)
)
default_options.register_option("ml.dry_run", False, validator=is_bool)
default_options.register_option("ml.use_model_transfer", False, validator=is_bool)
default_options.register_option("ml.use_old_metrics", True, validator=is_bool)
default_options.register_option("ml.model_volume", "pyodps_volume", validator=is_string)

# Runner
default_options.redirect_option("runner.dry_run", "ml.dry_run")

# display
from .console import detect_console_encoding

default_options.register_pandas(
    "display.encoding", detect_console_encoding(), validator=is_string
)
default_options.register_pandas(
    "display.max_rows", 60, validator=any_validator(is_null, is_integer)
)
default_options.register_pandas(
    "display.max_columns", 20, validator=any_validator(is_null, is_integer)
)
default_options.register_pandas(
    "display.large_repr", "truncate", validator=is_in(["truncate", "info"])
)
default_options.register_pandas("display.notebook_repr_html", True, validator=is_bool)
default_options.register_pandas("display.precision", 6, validator=is_integer)
default_options.register_pandas("display.float_format", None)
default_options.register_pandas("display.chop_threshold", None)
default_options.register_pandas("display.column_space", 12, validator=is_integer)
default_options.register_pandas("display.pprint_nest_depth", 3, validator=is_integer)
default_options.register_pandas("display.max_seq_items", 100, validator=is_integer)
default_options.register_pandas("display.max_colwidth", 50, validator=is_integer)
default_options.register_pandas("display.multi_sparse", True, validator=is_bool)
default_options.register_pandas(
    "display.colheader_justify", "right", validator=is_string
)
default_options.register_pandas(
    "display.unicode.ambiguous_as_wide", False, validator=is_bool
)
default_options.register_pandas(
    "display.unicode.east_asian_width", False, validator=is_bool
)
default_options.redirect_option("display.height", "display.max_rows")
default_options.register_pandas(
    "display.width", 80, validator=any_validator(is_null, is_integer)
)
default_options.register_pandas("display.expand_frame_repr", True)
default_options.register_pandas(
    "display.show_dimensions", "truncate", validator=is_in([True, False, "truncate"])
)

default_options.register_option("display.notebook_widget", True, validator=is_bool)
default_options.redirect_option(
    "display.notebook_repr_widget", "display.notebook_widget"
)

# Mars
default_options.register_option("mars.use_common_proxy", True, validator=is_bool)
default_options.register_option("mars.launch_notebook", False, validator=is_bool)
default_options.register_option(
    "mars.to_dataframe_memory_scale", None, validator=any_validator(is_null, is_integer)
)
default_options.register_option(
    "mars.container_status_timeout", 120, validator=is_integer
)


_options_local = threading.local()
if contextvars is not None:
    _options_ctx_var = contextvars.ContextVar("_options_ctx_var")
else:
    _options_ctx_var = None


def reset_global_options():
    global _options_local, _options_ctx_var
    if _options_ctx_var is not None:
        _options_ctx_var = contextvars.ContextVar("_options_ctx_var")
        _options_ctx_var.set(default_options)
    else:
        _options_local = threading.local()
        _options_local.default_options = default_options


reset_global_options()


def get_global_options(copy=False):
    if _options_ctx_var is not None:
        ret = _options_ctx_var.get(None)
    else:
        ret = getattr(_options_local, "default_options", None)

    if ret is None:
        if not copy:
            ret = default_options
        else:
            ret = Config(deepcopy(default_options._config))
        if _options_ctx_var is not None:
            _options_ctx_var.set(ret)
        else:
            _options_local.default_options = ret

    return ret


def set_global_options(opts):
    if _options_ctx_var is not None:
        _options_ctx_var.set(opts)
    else:
        _options_local.default_options = opts


@contextlib.contextmanager
def option_context(config=None):
    global_options = get_global_options(copy=True)

    try:
        config = config or dict()
        local_options = Config(deepcopy(global_options._config))
        local_options.update(config)
        set_global_options(local_options)
        yield local_options
    finally:
        set_global_options(global_options)


class OptionsProxy(object):
    def __dir__(self):
        return dir(get_global_options())

    def __getattribute__(self, attr):
        return getattr(get_global_options(), attr)

    def __setattr__(self, key, value):
        setattr(get_global_options(), key, value)


options = OptionsProxy()
