core/maxframe/config/config.py (442 lines of code) (raw):

# Copyright 1999-2025 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import contextlib import contextvars import traceback import warnings from copy import deepcopy from typing import Any, Dict, Optional, Union from odps.lib import tzlocal try: from zoneinfo import available_timezones except ImportError: from pytz import all_timezones available_timezones = lambda: all_timezones import logging from ..utils import get_python_tag from .validators import ( ValidatorType, all_validator, is_bool, is_dict, is_in, is_integer, is_non_negative_integer, is_null, is_numeric, is_string, is_valid_cache_path, ) logger = logging.getLogger(__name__) _DEFAULT_REDIRECT_WARN = "Option {source} has been replaced by {target} and might be removed in a future release." _DEFAULT_MAX_ALIVE_SECONDS = 3 * 24 * 3600 _DEFAULT_MAX_IDLE_SECONDS = 3600 _DEFAULT_SPE_OPERATION_TIMEOUT_SECONDS = 120 _DEFAULT_SPE_FAILURE_RETRY_TIMES = 5 _DEFAULT_UPLOAD_BATCH_SIZE = 4096 _DEFAULT_TEMP_LIFECYCLE = 1 _DEFAULT_TASK_START_TIMEOUT = 60 _DEFAULT_TASK_RESTART_TIMEOUT = 300 _DEFAULT_LOGVIEW_HOURS = 24 * 30 class OptionError(Exception): pass class Redirection: def __init__(self, item: str, warn: Optional[str] = None): self._items = item.split(".") self._warn = warn self._warned = True self._parent = None def bind(self, attr_dict): self._parent = attr_dict self.getvalue() self._warned = False def getvalue(self, silent: bool = False) -> Any: if not silent and self._warn and not self._warned: in_completer = any( 1 for st in traceback.extract_stack() if "completer" in st[0].lower() ) if not in_completer: self._warned = True warnings.warn(self._warn) conf = self._parent.root for it in self._items: conf = getattr(conf, it) return conf def setvalue(self, value: str, silent: bool = False) -> None: if not silent and self._warn and not self._warned: self._warned = True warnings.warn(self._warn) conf = self._parent.root for it in self._items[:-1]: conf = getattr(conf, it) setattr(conf, self._items[-1], value) class AttributeDict(dict): def __init__(self, *args, **kwargs): self._inited = False self._parent = kwargs.pop("_parent", None) self._root = None super().__init__(*args, **kwargs) self._inited = True @property def root(self): if self._root is not None: return self._root if self._parent is None: self._root = self else: self._root = self._parent.root return self._root def __getattr__(self, item: str): if item in self: val = self[item] if isinstance(val, AttributeDict): return val elif isinstance(val[0], Redirection): return val[0].getvalue() else: return val[0] return object.__getattribute__(self, item) def __dir__(self): return list(self.keys()) def register( self, key: str, value: Any, validator: Optional[ValidatorType] = None ) -> None: self[key] = value, validator if isinstance(value, Redirection): value.bind(self) def unregister(self, key: str) -> None: del self[key] def add_validator(self, key: str, validator: ValidatorType) -> None: value, old_validator = self[key] validators = getattr( old_validator, "validators", [old_validator] if callable(old_validator) else [], ) validators.append(validator) self[key] = (value, all_validator(*validators)) def _setattr(self, key: str, value: Any, silent: bool = False) -> None: if not silent and key not in self: raise OptionError(f"Cannot identify configuration name '{key}'.") if not isinstance(value, AttributeDict): validate = None if key in self: val = self[key] validate = self[key][1] if validate is not None: if not validate(value): raise ValueError(f"Cannot set value {value}") if isinstance(val[0], Redirection): val[0].setvalue(value) else: self[key] = value, validate else: self[key] = value, validate else: self[key] = value def __setattr__(self, key: str, value: Any): if key == "_inited": super().__setattr__(key, value) return try: object.__getattribute__(self, key) super().__setattr__(key, value) return except AttributeError: pass if not self._inited: super().__setattr__(key, value) else: self._setattr(key, value) def to_dict(self) -> Dict[str, Any]: result_dict = dict() for k, v in self.items(): if isinstance(v, AttributeDict): result_dict.update((f"{k}.{sk}", sv) for sk, sv in v.to_dict().items()) elif isinstance(v[0], Redirection): continue else: result_dict[k] = v[0] return result_dict class Config: def __init__(self, config=None): self._config = config or AttributeDict() self._remote_options = set() def __dir__(self): return list(self._config.keys()) def __getattr__(self, item: str): return getattr(self._config, item) def __setattr__(self, key: str, value: Any): if key.startswith("_"): object.__setattr__(self, key, value) return setattr(self._config, key, value) def register_option( self, option: str, value: Any, validator: Optional[ValidatorType] = None, remote: bool = False, ) -> None: assert validator is None or callable(validator) splits = option.split(".") conf = self._config for name in splits[:-1]: config = conf.get(name) if config is None: val = AttributeDict(_parent=conf) conf[name] = val conf = val elif not isinstance(config, dict): raise AttributeError( f"Fail to set option: {option}, conflict has encountered" ) else: conf = config key = splits[-1] if conf.get(key) is not None: raise AttributeError(f"Fail to set option: {option}, option has been set") conf.register(key, value, validator) if remote: self._remote_options.add(option) def redirect_option( self, option: str, target: str, warn: str = _DEFAULT_REDIRECT_WARN ) -> None: redir = Redirection(target, warn=warn.format(source=option, target=target)) self.register_option(option, redir) def unregister_option(self, option: str) -> None: splits = option.split(".") conf = self._config for name in splits[:-1]: config = conf.get(name) if not isinstance(config, dict): raise AttributeError( f"Fail to unregister option: {option}, conflict has encountered" ) else: conf = config key = splits[-1] if key not in conf: raise AttributeError( f"Option {option} not configured, thus failed to unregister." ) conf.unregister(key) def update(self, new_config: Union["Config", Dict[str, Any]]) -> None: if not isinstance(new_config, dict): new_config = new_config._config for option, value in new_config.items(): try: self.register_option(option, value) except AttributeError: attrs = option.split(".") cur_cfg = self for sub_cfg_name in attrs[:-1]: cur_cfg = getattr(cur_cfg, sub_cfg_name) setattr(cur_cfg, attrs[-1], value) def add_validator(self, option: str, validator: ValidatorType) -> None: splits = option.split(".") conf = self._config for name in splits[:-1]: config = conf.get(name) if not isinstance(config, dict): raise AttributeError( f"Fail to add validator: {option}, conflict has encountered" ) else: conf = config key = splits[-1] if key not in conf: raise AttributeError( f"Option {option} not configured, thus failed to set validator." ) conf.add_validator(key, validator) def to_dict(self, remote_only: bool = False) -> Dict[str, Any]: res = self._config.to_dict() if not remote_only: return res return {k: v for k, v in res.items() if k in self._remote_options} def _get_legal_local_tz_name() -> Optional[str]: """Sometimes we may get illegal tz name from tzlocal.get_localzone()""" tz_name = str(tzlocal.get_localzone()) if tz_name not in available_timezones(): return None return tz_name default_options = Config() default_options.register_option( "execution_mode", "trigger", validator=is_in(["trigger", "eager"]) ) default_options.register_option("use_common_table", False, validator=is_bool) default_options.register_option( "tunnel_quota_name", None, validator=is_string | is_null ) default_options.register_option( "python_tag", get_python_tag(), validator=is_string, remote=True ) default_options.register_option( "local_timezone", _get_legal_local_tz_name(), validator=is_null | is_in(set(available_timezones())), remote=True, ) default_options.register_option( "session.logview_hours", _DEFAULT_LOGVIEW_HOURS, validator=is_integer, remote=True ) default_options.register_option( "client.task_start_timeout", _DEFAULT_TASK_START_TIMEOUT, validator=is_integer ) default_options.register_option( "client.task_restart_timeout", _DEFAULT_TASK_RESTART_TIMEOUT, validator=is_integer ) default_options.register_option("sql.enable_mcqa", True, validator=is_bool, remote=True) default_options.register_option( "sql.generate_comments", True, validator=is_bool, remote=True ) default_options.register_option( "sql.auto_use_common_image", True, validator=is_bool, remote=True ) default_options.register_option("sql.settings", {}, validator=is_dict, remote=True) default_options.register_option("is_production", False, validator=is_bool, remote=True) default_options.register_option("schedule_id", "", validator=is_string, remote=True) default_options.register_option( "service_role_arn", None, validator=is_null | is_string, remote=True ) default_options.register_option( "object_cache_url", None, validator=is_null | is_valid_cache_path, remote=True ) default_options.register_option( "chunk_size", None, validator=is_null | is_integer, remote=True ) default_options.register_option( "session.max_alive_seconds", _DEFAULT_MAX_ALIVE_SECONDS, validator=is_numeric, remote=True, ) default_options.register_option( "session.max_idle_seconds", _DEFAULT_MAX_IDLE_SECONDS, validator=is_numeric, remote=True, ) default_options.register_option( "session.quota_name", None, validator=is_null | is_string, remote=True ) default_options.register_option( "session.enable_schema", None, validator=is_null | is_bool, remote=True ) default_options.register_option( "session.enable_high_availability", None, validator=is_null | is_bool, remote=True ) default_options.register_option( "session.default_schema", None, validator=is_null | is_string, remote=True ) default_options.register_option( "session.upload_batch_size", _DEFAULT_UPLOAD_BATCH_SIZE, validator=is_integer, ) default_options.register_option( "session.table_lifecycle", None, validator=is_null | is_integer, remote=True ) default_options.register_option( "session.temp_table_lifecycle", _DEFAULT_TEMP_LIFECYCLE, validator=is_integer, remote=True, ) default_options.register_option( "session.temp_table_properties", None, validator=is_null | is_dict, remote=True, ) default_options.register_option( "session.auto_purge_temp_tables", False, validator=is_bool, remote=True, ) default_options.register_option( "session.subinstance_priority", None, validator=is_null | is_integer, remote=True, ) default_options.register_option("warn_duplicated_execution", False, validator=is_bool) default_options.register_option("dataframe.use_arrow_dtype", True, validator=is_bool) default_options.register_option( "dataframe.arrow_array.pandas_only", True, validator=is_bool ) default_options.register_option( "optimize.head_optimize_threshold", 1000, validator=is_integer ) default_options.register_option("show_progress", "auto", validator=is_bool | is_string) default_options.register_option( "dag.settings", value=dict(), validator=is_dict, remote=True ) ################ # SPE Settings # ################ default_options.register_option( "spe.operation_timeout_seconds", _DEFAULT_SPE_OPERATION_TIMEOUT_SECONDS, validator=is_non_negative_integer, remote=True, ) default_options.register_option( "spe.failure_retry_times", _DEFAULT_SPE_FAILURE_RETRY_TIMES, validator=is_non_negative_integer, remote=True, ) default_options.register_option( "spe.task.settings", dict(), validator=is_dict, remote=True ) default_options.register_option( "pythonpack.task.settings", {}, validator=is_dict, remote=True ) _options_ctx_var = contextvars.ContextVar("_options_ctx_var") def reset_global_options(): global _options_ctx_var _options_ctx_var = contextvars.ContextVar("_options_ctx_var") _options_ctx_var.set(default_options) reset_global_options() def get_global_options(copy: bool = False) -> Config: ret = _options_ctx_var.get(None) if ret is None: if not copy: ret = default_options else: ret = Config(deepcopy(default_options._config)) _options_ctx_var.set(ret) return ret def set_global_options(opts: Config) -> None: _options_ctx_var.set(opts) @contextlib.contextmanager def option_context(config: Dict[str, Any] = None): global_options = get_global_options(copy=True) try: config = config or dict() local_options = Config(deepcopy(global_options._config)) local_options.update(config) set_global_options(local_options) yield local_options finally: set_global_options(global_options) class OptionsProxy: def __dir__(self): return dir(get_global_options()) def __getattribute__(self, attr): return getattr(get_global_options(), attr) def __setattr__(self, key, value): setattr(get_global_options(), key, value) options = OptionsProxy() def update_wlm_quota_settings(session_id: str, engine_settings: Dict[str, Any]): engine_quota = engine_settings.get("odps.task.wlm.quota", None) session_quota = options.session.quota_name or None if engine_quota != session_quota and engine_quota: logger.warning( "[Session=%s] Session quota (%s) is different to SubDag engine quota (%s)", session_id, session_quota, engine_quota, ) # TODO(renxiang): overwrite or not overwrite return if session_quota: engine_settings["odps.task.wlm.quota"] = session_quota elif "odps.task.wlm.quota" in engine_settings: engine_settings.pop("odps.task.wlm.quota")