experimenter/typings/celery/app/utils.pyi (69 lines of code) (raw):
"""
This type stub file was generated by pyright.
"""
from celery.utils.collections import ConfigurationView
"""App utilities: Compat settings, bug-report tool, pickling apps."""
__all__ = ("Settings", "appstr", "bugreport", "filter_hidden_settings", "find_app")
BUGREPORT_INFO = ...
HIDDEN_SETTINGS = ...
E_MIX_OLD_INTO_NEW = ...
E_MIX_NEW_INTO_OLD = ...
FMT_REPLACE_SETTING = ...
def appstr(app): # -> str:
"""String used in __repr__ etc, to id app instances."""
...
class Settings(ConfigurationView):
"""Celery settings object.
.. seealso:
:ref:`configuration` for a full list of configuration keys.
"""
def __init__(self, *args, deprecated_settings=..., **kwargs) -> None: ...
@property
def broker_read_url(self): ...
@property
def broker_write_url(self): ...
@property
def broker_url(self): ...
@property
def result_backend(self): ...
@property
def task_default_exchange(self): ...
@property
def task_default_routing_key(self): ...
@property
def timezone(self): ...
def without_defaults(self): # -> Settings:
"""Return the current configuration, but without defaults."""
...
def value_set_for(self, key): ...
def find_option(self, name, namespace=...): # -> searchresult:
"""Search for option by name.
Example:
>>> from proj.celery import app
>>> app.conf.find_option("disable_rate_limits")
('worker', 'prefetch_multiplier',
<Option: type->bool default->False>))
Arguments:
name (str): Name of option, cannot be partial.
namespace (str): Preferred name-space (``None`` by default).
Returns:
Tuple: of ``(namespace, key, type)``.
"""
...
def find_value_for_key(self, name, namespace=...): # -> Any:
"""Shortcut to ``get_by_parts(*find_option(name)[:-1])``."""
...
def get_by_parts(self, *parts): # -> Any:
"""Return the current value for setting specified as a path.
Example:
>>> from proj.celery import app
>>> app.conf.get_by_parts("worker", "disable_rate_limits")
False
"""
...
def finalize(self): ...
def table(self, with_defaults=..., censored=...): ...
def humanize(self, with_defaults=..., censored=...): # -> str:
"""Return a human readable text showing configuration changes."""
...
def maybe_warn_deprecated_settings(self): ...
_settings_info_t = ...
_settings_info = ...
_old_settings_info = ...
def detect_settings(
conf, preconf=..., ignore_keys=..., prefix=..., all_keys=..., old_keys=...
): ...
class AppPickler:
"""Old application pickler/unpickler (< 3.1)."""
def __call__(self, cls, *args): ...
def prepare(self, app, **kwargs): ...
def build_kwargs(self, *args): ...
def build_standard_kwargs(
self,
main,
changes,
loader,
backend,
amqp,
events,
log,
control,
accept_magic_kwargs,
config_source=...,
): ...
def construct(self, cls, **kwargs): ...
def filter_hidden_settings(conf): # -> dict[Unknown, Unknown | str]:
"""Filter sensitive settings."""
...
def bugreport(app): # -> str:
"""Return a string containing information useful in bug-reports."""
...
def find_app(app, symbol_by_name=..., imp=...): # -> Any | Celery | ModuleType:
"""Find app by name."""
...