odps/config.py (640 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2025 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import contextlib
import logging
import threading
import traceback
import warnings
from copy import deepcopy
from .compat import six
try:
import contextvars
except ImportError:
contextvars = None
DEFAULT_BLOCK_BUFFER_SIZE = 20 * 1024**2
DEFAULT_CHUNK_SIZE = 65536
DEFAULT_CONNECT_RETRY_TIMES = 4
DEFAULT_CONNECT_TIMEOUT = 120
DEFAULT_READ_TIMEOUT = 120
DEFAULT_POOL_CONNECTIONS = 10
DEFAULT_POOL_MAXSIZE = 10
DEFAULT_RETRY_DELAY = 0.1
_DEFAULT_REDIRECT_WARN = "Option {source} has been replaced by {target} and might be removed in a future release."
class OptionError(AttributeError):
pass
class Redirection(object):
def __init__(self, item, warn=None):
self._items = item.split(".")
self._warn = warn
self._warned = True
self._parent = None
def bind(self, attr_dict):
self._parent = attr_dict
self.getvalue()
self._warned = False
def getvalue(self, silent=False):
if not silent and self._warn and not self._warned:
in_completer = any(
1 for st in traceback.extract_stack() if "completer" in st[0].lower()
)
if not in_completer:
self._warned = True
warnings.warn(self._warn)
conf = self._parent.root
for it in self._items:
conf = getattr(conf, it)
return conf
def setvalue(self, value, silent=False):
if not silent and self._warn and not self._warned:
self._warned = True
warnings.warn(self._warn)
conf = self._parent.root
for it in self._items[:-1]:
conf = getattr(conf, it)
setattr(conf, self._items[-1], value)
class PandasRedirection(Redirection):
def __init__(self, *args, **kw):
super(PandasRedirection, self).__init__(*args, **kw)
self._val = None
try:
import pandas # noqa: F401
self._use_pd = True
except (ImportError, ValueError):
self._use_pd = False
def getvalue(self, silent=False):
if self._use_pd:
import pandas as pd
try:
return pd.get_option(".".join(self._items))
except (KeyError, LookupError, AttributeError):
self._use_pd = False
else:
return self._val
def setvalue(self, value, silent=False):
if self._use_pd:
import pandas as pd
key = ".".join(self._items)
if value != pd.get_option(key):
pd.set_option(key, value)
else:
self._val = value
class AttributeDict(dict):
def __init__(self, *args, **kwargs):
self._inited = False
self._parent = kwargs.pop("_parent", None)
self._root = None
super(AttributeDict, self).__init__(*args, **kwargs)
self._inited = True
@property
def root(self):
if self._root is not None:
return self._root
if self._parent is None:
self._root = self
else:
self._root = self._parent.root
return self._root
def __getattr__(self, item):
if item in self:
val = self[item]
if isinstance(val, AttributeDict):
return val
elif isinstance(val[0], Redirection):
return val[0].getvalue()
else:
return val[0]
return object.__getattribute__(self, item)
def __dir__(self):
return list(six.iterkeys(self))
def register(self, key, value, validator=None):
self[key] = value, validator
if isinstance(value, Redirection):
value.bind(self)
def unregister(self, key):
del self[key]
def add_validator(self, key, validator):
value, old_validator = self[key]
validators = getattr(
old_validator,
"validators",
[old_validator] if callable(old_validator) else [],
)
validators.append(validator)
self[key] = (value, all_validator(*validators))
def _setattr(self, key, value, silent=False):
if not silent and key not in self:
raise OptionError("Cannot identify configuration name '%s'." % str(key))
if not isinstance(value, AttributeDict):
validate = None
if key in self:
val = self[key]
validate = self[key][1]
if validate is not None:
if not validate(value):
raise ValueError("Cannot set value %s" % value)
if isinstance(val[0], Redirection):
val[0].setvalue(value)
else:
self[key] = value, validate
else:
self[key] = value, validate
else:
self[key] = value
def __setattr__(self, key, value):
if key == "_inited":
super(AttributeDict, self).__setattr__(key, value)
return
try:
object.__getattribute__(self, key)
super(AttributeDict, self).__setattr__(key, value)
return
except AttributeError:
pass
if not self._inited:
super(AttributeDict, self).__setattr__(key, value)
else:
self._setattr(key, value)
def loads(self, d):
dispatches = collections.defaultdict(dict)
for k, v in six.iteritems(d):
if "." in k:
sk, rk = k.split(".", 1)
dispatches[sk][rk] = v
elif isinstance(self[k][0], Redirection):
self[k][0].setvalue(v, silent=True)
else:
setattr(self, k, v)
for k, v in six.iteritems(dispatches):
self[k].loads(v)
def dumps(self):
from .accounts import BaseAccount
result_dict = dict()
for k, v in six.iteritems(self):
if isinstance(v, AttributeDict):
result_dict.update(
(k + "." + sk, sv) for sk, sv in six.iteritems(v.dumps())
)
elif isinstance(v[0], BaseAccount) or callable(v[0]):
# ignore accounts in config dumps
result_dict[k] = None
elif isinstance(v[0], Redirection):
result_dict[k] = v[0].getvalue(silent=True)
else:
result_dict[k] = v[0]
return result_dict
class Config(object):
def __init__(self, config=None):
self._config = config or AttributeDict()
def __dir__(self):
return list(six.iterkeys(self._config))
def __getattr__(self, item):
return getattr(self._config, item)
def __setattr__(self, key, value):
if key == "_config":
object.__setattr__(self, key, value)
return
setattr(self._config, key, value)
def register_option(self, option, value, validator=None):
assert validator is None or callable(validator)
splits = option.split(".")
conf = self._config
for name in splits[:-1]:
config = conf.get(name)
if config is None:
val = AttributeDict(_parent=conf)
conf[name] = val
conf = val
elif not isinstance(config, dict):
raise AttributeError(
"Fail to set option: %s, conflict has encountered" % option
)
else:
conf = config
key = splits[-1]
if conf.get(key) is not None:
raise AttributeError("Fail to set option: %s, option has been set" % option)
conf.register(key, value, validator)
def register_pandas(self, option, value=None, validator=None):
redir = PandasRedirection(option)
self.register_option(option, redir, validator=validator)
if value is not None:
redir.setvalue(value)
def redirect_option(self, option, target, warn=_DEFAULT_REDIRECT_WARN):
redir = Redirection(target, warn=warn.format(source=option, target=target))
self.register_option(option, redir)
def unregister_option(self, option):
splits = option.split(".")
conf = self._config
for name in splits[:-1]:
config = conf.get(name)
if not isinstance(config, dict):
raise AttributeError(
"Fail to unregister option: %s, conflict has encountered" % option
)
else:
conf = config
key = splits[-1]
if key not in conf:
raise AttributeError(
"Option %s not configured, thus failed to unregister." % option
)
conf.unregister(key)
def update(self, new_config):
if not isinstance(new_config, dict):
new_config = new_config._config
for option, value in new_config.items():
try:
self.register_option(option, value)
except AttributeError:
setattr(self, option, value)
def add_validator(self, option, validator):
splits = option.split(".")
conf = self._config
for name in splits[:-1]:
config = conf.get(name)
if not isinstance(config, dict):
raise AttributeError(
"Fail to add validator: %s, conflict has encountered" % option
)
else:
conf = config
key = splits[-1]
if key not in conf:
raise AttributeError(
"Option %s not configured, thus failed to set validator." % option
)
conf.add_validator(key, validator)
def loads(self, d):
return self._config.loads(d)
def dumps(self):
return self._config.dumps()
def is_interactive():
import __main__ as main
return not hasattr(main, "__file__")
# validators
def any_validator(*validators):
def validate(x):
return any(validator(x) for validator in validators)
return validate
def all_validator(*validators):
def validate(x):
return all(validator(x) for validator in validators)
validate.validators = validators
return validate
is_null = lambda x: x is None
is_bool = lambda x: isinstance(x, bool)
is_float = lambda x: isinstance(x, float)
is_integer = lambda x: isinstance(x, six.integer_types)
is_string = lambda x: isinstance(x, six.string_types)
is_dict = lambda x: isinstance(x, dict)
is_list = lambda x: isinstance(x, list)
def is_in(vals):
def validate(x):
return x in vals
return validate
def show_deprecated(msg):
def validator(x):
warnings.warn(msg, DeprecationWarning)
return x
return validator
def verbose_log_validator(val):
if options.verbose == val:
# no flip, return directly
return True
logging.basicConfig()
odps_logger = logging.getLogger("odps")
odps_logger.propagate = False
if _stream_handler not in odps_logger.handlers:
odps_logger.addHandler(_stream_handler)
if val:
logging.basicConfig()
old_effective_level = odps_logger.getEffectiveLevel() or logging.WARNING
target_level = min(logging.INFO, old_effective_level)
odps_logger.oldLevel = odps_logger.level
odps_logger.setLevel(target_level)
_stream_handler.oldLevel = _stream_handler.level
_stream_handler.setLevel(target_level)
odps_logger.addHandler(_verbose_log_handler)
else:
old_level = getattr(odps_logger, "oldLevel", logging.NOTSET)
odps_logger.setLevel(old_level)
old_level = getattr(_stream_handler, "oldLevel", logging.NOTSET)
_stream_handler.setLevel(old_level)
odps_logger.removeHandler(_verbose_log_handler)
return True
class VerboseLogHandler(logging.StreamHandler):
def emit(self, record):
try:
if options.verbose_log:
msg = self.format(record)
options.verbose_log(msg)
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)
_stream_handler = logging.StreamHandler()
_verbose_log_handler = VerboseLogHandler()
default_options = Config()
default_options.register_option(
"is_global_account_overwritable", True, validator=is_bool
)
default_options.register_option("account", None)
default_options.register_option("endpoint", None)
default_options.redirect_option("end_point", "endpoint")
default_options.register_option("default_project", None)
default_options.register_option("default_schema", None)
default_options.register_option("default_namespace", None)
default_options.register_option("app_account", None)
default_options.register_option("region_name", None)
default_options.register_option("quota_name", None)
default_options.register_option("local_timezone", None)
default_options.register_option("use_legacy_parsedate", False)
default_options.register_option("allow_antique_date", False)
default_options.register_option(
"user_agent_pattern",
"$pyodps_version $python_version $os_version $maxframe_version",
)
default_options.register_option(
"use_legacy_logview", True, validator=any_validator(is_bool, is_null)
)
default_options.register_option("logview_host", None)
default_options.register_option("logview_hours", 24 * 30, validator=is_integer)
default_options.redirect_option("log_view_host", "logview_host")
default_options.redirect_option("log_view_hours", "logview_hours")
default_options.register_option("api_proxy", None)
default_options.register_option("data_proxy", None)
default_options.redirect_option("tunnel_proxy", "data_proxy")
default_options.register_option("seahawks_url", None)
default_options.register_option("biz_id", None)
default_options.register_option(
"priority", None, validator=any_validator(is_null, is_integer)
)
default_options.register_option("get_priority", None)
default_options.register_option("temp_lifecycle", 1, validator=is_integer)
default_options.register_option(
"lifecycle", None, validator=any_validator(is_null, is_integer)
)
default_options.register_option(
"table_read_limit", None, validator=any_validator(is_null, is_integer)
)
default_options.register_option("completion_size", 10, validator=is_integer)
default_options.register_option(
"default_task_settings", None, validator=any_validator(is_null, is_dict)
)
default_options.register_option("resource_chunk_size", 64 << 20, validator=is_integer)
default_options.register_option("upload_resource_in_chunks", True, validator=is_bool)
default_options.register_option("verify_ssl", True)
default_options.register_option("enable_schema", False, validator=is_bool)
default_options.redirect_option("always_enable_schema", "enable_schema")
default_options.register_option("table_auto_flush_time", 150, validator=is_integer)
default_options.register_option("struct_as_dict", False, validator=is_bool)
default_options.register_option(
"struct_as_ordered_dict", None, validator=any_validator(is_bool, is_null)
)
default_options.register_option(
"map_as_ordered_dict", None, validator=any_validator(is_bool, is_null)
)
default_options.register_option(
"logview_latency", 10, validator=any_validator(is_float, is_integer)
)
default_options.register_option(
"progress_time_interval", 5 * 60, validator=any_validator(is_float, is_integer)
)
default_options.register_option("progress_percentage_gap", 5, validator=is_integer)
default_options.register_option("enable_v4_sign", False, validator=is_bool)
default_options.register_option("align_supported_python_tag", True, validator=is_bool)
# c or python mode, use for UT, in other cases, please do not modify the value
default_options.register_option("force_c", False, validator=is_integer)
default_options.register_option("force_py", False, validator=is_integer)
# callbacks for wrappers
default_options.register_option("instance_create_callback", None)
default_options.register_option("tunnel_session_create_callback", None)
default_options.register_option("tunnel_session_create_timeout_callback", None)
default_options.register_option("result_reader_create_callback", None)
default_options.register_option("tunnel_read_timeout_callback", None)
default_options.register_option("skipped_survey_regexes", [])
# network connections
default_options.register_option("chunk_size", DEFAULT_CHUNK_SIZE, validator=is_integer)
default_options.register_option(
"retry_times", DEFAULT_CONNECT_RETRY_TIMES, validator=is_integer
)
default_options.register_option(
"retry_delay", DEFAULT_RETRY_DELAY, validator=any_validator(is_integer, is_float)
)
default_options.register_option(
"connect_timeout", DEFAULT_CONNECT_TIMEOUT, validator=is_integer
)
default_options.register_option(
"read_timeout", DEFAULT_READ_TIMEOUT, validator=is_integer
)
default_options.register_option(
"pool_connections", DEFAULT_POOL_CONNECTIONS, validator=is_integer
)
default_options.register_option(
"pool_maxsize", DEFAULT_POOL_MAXSIZE, validator=is_integer
)
# Tunnel
default_options.register_option("tunnel.endpoint", None)
default_options.register_option("tunnel.string_as_binary", False, validator=is_bool)
default_options.register_option("tunnel.use_instance_tunnel", True, validator=is_bool)
default_options.register_option(
"tunnel.limit_instance_tunnel", None, validator=any_validator(is_null, is_bool)
)
default_options.register_option(
"tunnel.legacy_fallback_timeout", None, validator=any_validator(is_null, is_integer)
)
default_options.register_option(
"tunnel.pd_mem_cache_size", 1024 * 4, validator=is_integer
)
default_options.register_option(
"tunnel.pd_row_cache_size", 1024 * 16, validator=is_integer
)
default_options.register_option(
"tunnel.pd_cast_mode", None, validator=is_in([None, "numpy", "arrow"])
)
default_options.register_option(
"tunnel.read_row_batch_size", 1024, validator=is_integer
)
default_options.register_option(
"tunnel.write_row_batch_size", 1024, validator=is_integer
)
default_options.register_option(
"tunnel.batch_merge_threshold", 128, validator=is_integer
)
default_options.register_option(
"tunnel.overflow_date_as_none", False, validator=is_bool
)
default_options.register_option(
"tunnel.quota_name", None, validator=any_validator(is_null, is_string)
)
default_options.register_option(
"tunnel.block_buffer_size", DEFAULT_BLOCK_BUFFER_SIZE, validator=is_integer
)
default_options.register_option(
"tunnel.use_block_writer_by_default", False, validator=is_bool
)
default_options.register_option(
"tunnel.tags", None, validator=any_validator(is_null, is_string, is_list)
)
default_options.register_option(
"tunnel.enable_client_metrics", False, validator=any_validator(is_null, is_bool)
)
default_options.register_option("tunnel.compress.enabled", False, validator=is_bool)
default_options.register_option(
"tunnel.compress.algo", None, validator=any_validator(is_null, is_string)
)
default_options.register_option("tunnel.compress.level", 1, validator=is_integer)
default_options.register_option("tunnel.compress.strategy", 0, validator=is_integer)
default_options.redirect_option("tunnel_endpoint", "tunnel.endpoint")
default_options.redirect_option("use_instance_tunnel", "tunnel.use_instance_tunnel")
default_options.redirect_option(
"limited_instance_tunnel", "tunnel.limit_instance_tunnel"
)
default_options.redirect_option(
"tunnel.limited_instance_tunnel", "tunnel.limit_instance_tunnel"
)
# terminal
default_options.register_option("console.max_lines", None)
default_options.register_option("console.max_width", None)
default_options.register_option("console.use_color", False, validator=is_bool)
# SQL
default_options.register_option(
"sql.settings", None, validator=any_validator(is_null, is_dict)
)
default_options.register_option("sql.ignore_fields_not_null", False, validator=is_bool)
default_options.register_option(
"sql.use_odps2_extension", None, validator=any_validator(is_null, is_bool)
)
# sqlalchemy
default_options.register_option(
"sqlalchemy.project_as_schema", None, validator=any_validator(is_null, is_bool)
)
# DataFrame
default_options.register_option("interactive", is_interactive(), validator=is_bool)
default_options.register_option(
"verbose", False, validator=all_validator(is_bool, verbose_log_validator)
)
default_options.register_option("verbose_log", None)
default_options.register_option("df.optimize", True, validator=is_bool)
default_options.register_option("df.optimizes.cp", True, validator=is_bool)
default_options.register_option("df.optimizes.pp", True, validator=is_bool)
default_options.register_option("df.optimizes.tunnel", True, validator=is_bool)
default_options.register_option("df.analyze", True, validator=is_bool)
default_options.register_option("df.use_cache", True, validator=is_bool)
default_options.register_option("df.quote", True, validator=is_bool)
default_options.register_option("df.dump_udf", False, validator=is_bool)
default_options.register_option("df.supersede_libraries", True, validator=is_bool)
default_options.register_option("df.libraries", None)
default_options.register_option("df.image", None)
default_options.register_option("df.odps.sort.limit", 10000)
default_options.register_option(
"df.odps.nan_handler", "py"
) # None for not handled, builtin for built-in ISNAN function
default_options.register_option(
"df.sqlalchemy.execution_options", None, validator=any_validator(is_null, is_dict)
)
default_options.register_option("df.seahawks.max_size", 10 * 1024 * 1024 * 1024) # 10G
default_options.register_option("df.delete_udfs", True, validator=is_bool)
default_options.register_option("df.use_xflow_sample", False, validator=is_bool)
default_options.register_option("df.writer_count_limit", 50, validator=is_integer)
# PyODPS ML
default_options.register_option("ml.xflow_project", "algo_public", validator=is_string)
default_options.register_option(
"ml.xflow_settings", None, validator=any_validator(is_null, is_dict)
)
default_options.register_option("ml.dry_run", False, validator=is_bool)
default_options.register_option("ml.use_model_transfer", False, validator=is_bool)
default_options.register_option("ml.use_old_metrics", True, validator=is_bool)
default_options.register_option("ml.model_volume", "pyodps_volume", validator=is_string)
# Runner
default_options.redirect_option("runner.dry_run", "ml.dry_run")
# display
from .console import detect_console_encoding
default_options.register_pandas(
"display.encoding", detect_console_encoding(), validator=is_string
)
default_options.register_pandas(
"display.max_rows", 60, validator=any_validator(is_null, is_integer)
)
default_options.register_pandas(
"display.max_columns", 20, validator=any_validator(is_null, is_integer)
)
default_options.register_pandas(
"display.large_repr", "truncate", validator=is_in(["truncate", "info"])
)
default_options.register_pandas("display.notebook_repr_html", True, validator=is_bool)
default_options.register_pandas("display.precision", 6, validator=is_integer)
default_options.register_pandas("display.float_format", None)
default_options.register_pandas("display.chop_threshold", None)
default_options.register_pandas("display.column_space", 12, validator=is_integer)
default_options.register_pandas("display.pprint_nest_depth", 3, validator=is_integer)
default_options.register_pandas("display.max_seq_items", 100, validator=is_integer)
default_options.register_pandas("display.max_colwidth", 50, validator=is_integer)
default_options.register_pandas("display.multi_sparse", True, validator=is_bool)
default_options.register_pandas(
"display.colheader_justify", "right", validator=is_string
)
default_options.register_pandas(
"display.unicode.ambiguous_as_wide", False, validator=is_bool
)
default_options.register_pandas(
"display.unicode.east_asian_width", False, validator=is_bool
)
default_options.redirect_option("display.height", "display.max_rows")
default_options.register_pandas(
"display.width", 80, validator=any_validator(is_null, is_integer)
)
default_options.register_pandas("display.expand_frame_repr", True)
default_options.register_pandas(
"display.show_dimensions", "truncate", validator=is_in([True, False, "truncate"])
)
default_options.register_option("display.notebook_widget", True, validator=is_bool)
default_options.redirect_option(
"display.notebook_repr_widget", "display.notebook_widget"
)
# Mars
default_options.register_option("mars.use_common_proxy", True, validator=is_bool)
default_options.register_option("mars.launch_notebook", False, validator=is_bool)
default_options.register_option(
"mars.to_dataframe_memory_scale", None, validator=any_validator(is_null, is_integer)
)
default_options.register_option(
"mars.container_status_timeout", 120, validator=is_integer
)
_options_local = threading.local()
if contextvars is not None:
_options_ctx_var = contextvars.ContextVar("_options_ctx_var")
else:
_options_ctx_var = None
def reset_global_options():
global _options_local, _options_ctx_var
if _options_ctx_var is not None:
_options_ctx_var = contextvars.ContextVar("_options_ctx_var")
_options_ctx_var.set(default_options)
else:
_options_local = threading.local()
_options_local.default_options = default_options
reset_global_options()
def get_global_options(copy=False):
if _options_ctx_var is not None:
ret = _options_ctx_var.get(None)
else:
ret = getattr(_options_local, "default_options", None)
if ret is None:
if not copy:
ret = default_options
else:
ret = Config(deepcopy(default_options._config))
if _options_ctx_var is not None:
_options_ctx_var.set(ret)
else:
_options_local.default_options = ret
return ret
def set_global_options(opts):
if _options_ctx_var is not None:
_options_ctx_var.set(opts)
else:
_options_local.default_options = opts
@contextlib.contextmanager
def option_context(config=None):
global_options = get_global_options(copy=True)
try:
config = config or dict()
local_options = Config(deepcopy(global_options._config))
local_options.update(config)
set_global_options(local_options)
yield local_options
finally:
set_global_options(global_options)
class OptionsProxy(object):
def __dir__(self):
return dir(get_global_options())
def __getattribute__(self, attr):
return getattr(get_global_options(), attr)
def __setattr__(self, key, value):
setattr(get_global_options(), key, value)
options = OptionsProxy()