maga_transformer/utils/thread_safe_deque.py (33 lines of code) (raw):

from collections import deque import threading class ThreadSafeDeque(): def __init__(self): self.deque = deque() self.lock = threading.Lock() def append(self, item): with self.lock: return self.deque.append(item) def appendleft(self, item): with self.lock: return self.deque.appendleft(item) def pop(self): with self.lock: return self.deque.pop() def popleft(self): with self.lock: return self.deque.popleft() def remove(self, item): with self.lock: return self.deque.remove(item) def __len__(self): with self.lock: return len(self.deque) def copy(self): with self.lock: return self.deque.copy() # 确保对原始 deque 的直接访问是线程安全的 def __getitem__(self, position): with self.lock: return self.deque[position] def __iter__(self): with self.lock: # 创建一个列表副本用于安全的迭代 return iter(list(self.deque))