experimenter/typings/celery/events/cursesmon.pyi (67 lines of code) (raw):
"""
This type stub file was generated by pyright.
"""
import threading
"""Graphical monitor of Celery events using curses."""
__all__ = ("CursesMonitor", "evtop")
BORDER_SPACING = ...
LEFT_BORDER_OFFSET = ...
UUID_WIDTH = ...
STATE_WIDTH = ...
TIMESTAMP_WIDTH = ...
MIN_WORKER_WIDTH = ...
MIN_TASK_WIDTH = ...
STATUS_SCREEN = ...
class CursesMonitor:
"""A curses based Celery task monitor."""
keymap = ...
win = ...
screen_delay = ...
selected_task = ...
selected_position = ...
selected_str = ...
foreground = ...
background = ...
online_str = ...
help_title = ...
help = ...
greet = ...
info_str = ...
def __init__(self, state, app, keymap=...) -> None: ...
def format_row(self, uuid, task, worker, timestamp, state): ...
@property
def screen_width(self): ...
@property
def screen_height(self): ...
@property
def display_width(self): ...
@property
def display_height(self): ...
@property
def limit(self): ...
def find_position(self): ...
def move_selection_up(self): ...
def move_selection_down(self): ...
def move_selection(self, direction=...): ...
keyalias = ...
def handle_keypress(self): ...
def alert(self, callback, title=...): ...
def selection_rate_limit(self): ...
def alert_remote_control_reply(self, reply): ...
def readline(self, x, y): ...
def revoke_selection(self): ...
def selection_info(self): ...
def selection_traceback(self): ...
def selection_result(self): ...
def display_task_row(self, lineno, task): ...
def draw(self): ...
def safe_add_str(self, y, x, string, *args, **kwargs): ...
def init_screen(self): ...
def resetscreen(self): ...
def nap(self): ...
@property
def tasks(self): ...
@property
def workers(self): ...
class DisplayThread(threading.Thread):
def __init__(self, display) -> None: ...
def run(self): ...
def capture_events(app, state, display): ...
def evtop(app=...): # -> None:
"""Start curses monitor."""
...
if __name__ == "__main__": ...