experimenter/typings/celery/worker/worker.pyi (86 lines of code) (raw):
"""
This type stub file was generated by pyright.
"""
from celery import bootsteps
"""WorkController can be used to instantiate in-process workers.
The command-line interface for the worker is in :mod:`celery.bin.worker`,
while the worker program is in :mod:`celery.apps.worker`.
The worker program is responsible for adding signal handlers,
setting up logging, etc. This is a bare-bones worker without
global side-effects (i.e., except for the global state stored in
:mod:`celery.worker.state`).
The worker consists of several components, all managed by bootsteps
(mod:`celery.bootsteps`).
"""
__all__ = ("WorkController",)
SHUTDOWN_SOCKET_TIMEOUT = ...
SELECT_UNKNOWN_QUEUE = ...
DESELECT_UNKNOWN_QUEUE = ...
class WorkController:
"""Unmanaged worker instance."""
app = ...
pidlock = ...
blueprint = ...
pool = ...
semaphore = ...
exitcode = ...
class Blueprint(bootsteps.Blueprint):
"""Worker bootstep blueprint."""
name = ...
default_steps = ...
def __init__(self, app=..., hostname=..., **kwargs) -> None: ...
def setup_instance(
self,
queues=...,
ready_callback=...,
pidfile=...,
include=...,
use_eventloop=...,
exclude_queues=...,
**kwargs,
): ...
def on_init_blueprint(self): ...
def on_before_init(self, **kwargs): ...
def on_after_init(self, **kwargs): ...
def on_start(self): ...
def on_consumer_ready(self, consumer): ...
def on_close(self): ...
def on_stopped(self): ...
def setup_queues(self, include, exclude=...): ...
def setup_includes(self, includes): ...
def prepare_args(self, **kwargs): ...
def start(self): ...
def register_with_event_loop(self, hub): ...
def signal_consumer_close(self): ...
def should_use_eventloop(self): ...
def stop(self, in_sighandler=..., exitcode=...): # -> None:
"""Graceful shutdown of the worker server."""
...
def terminate(self, in_sighandler=...): # -> None:
"""Not so graceful shutdown of the worker server."""
...
def reload(self, modules=..., reload=..., reloader=...): ...
def info(self): ...
def rusage(self): ...
def stats(self): ...
def __repr__(self): # -> str:
"""``repr(worker)``."""
...
def __str__(self) -> str:
"""``str(worker) == worker.hostname``."""
...
@property
def state(self): ...
def setup_defaults(
self,
concurrency=...,
loglevel=...,
logfile=...,
task_events=...,
pool=...,
consumer_cls=...,
timer_cls=...,
timer_precision=...,
autoscaler_cls=...,
pool_putlocks=...,
pool_restarts=...,
optimization=...,
O=...,
statedb=...,
time_limit=...,
soft_time_limit=...,
scheduler=...,
pool_cls=...,
state_db=...,
task_time_limit=...,
task_soft_time_limit=...,
scheduler_cls=...,
schedule_filename=...,
max_tasks_per_child=...,
prefetch_multiplier=...,
disable_rate_limits=...,
worker_lost_wait=...,
max_memory_per_child=...,
**_kw,
): ...