stubs/sqlalchemy/util/queue.pyi (18 lines of code) (raw):

import threading from typing import Any, Deque, Optional class Empty(Exception): ... class Full(Exception): ... class Queue(object): mutex: threading.RLock not_empty: threading.Condition not_full: threading.Condition maxsize: int queue: Deque[Any] def __init__(self, maxsize: int = ...) -> None: ... def qsize(self): ... def empty(self): ... def full(self): ... def put(self, item, block: bool = ..., timeout: Optional[Any] = ...): ... def put_nowait(self, item): ... def get(self, block: bool = ..., timeout: Optional[Any] = ...): ... def get_nowait(self): ...