in lerobot/common/utils/buffer.py [0:0]
def _get_async_iterator(self, batch_size: int, queue_size: int = 2):
"""
Create an iterator that continuously yields prefetched batches in a
background thread. The design is intentionally simple and avoids busy
waiting / complex state management.
Args:
batch_size (int): Size of batches to sample.
queue_size (int): Maximum number of prefetched batches to keep in
memory.
Yields:
BatchTransition: A batch sampled from the replay buffer.
"""
import queue
import threading
data_queue: queue.Queue = queue.Queue(maxsize=queue_size)
shutdown_event = threading.Event()
def producer() -> None:
"""Continuously put sampled batches into the queue until shutdown."""
while not shutdown_event.is_set():
try:
batch = self.sample(batch_size)
# The timeout ensures the thread unblocks if the queue is full
# and the shutdown event gets set meanwhile.
data_queue.put(batch, block=True, timeout=0.5)
except queue.Full:
# Queue is full – loop again (will re-check shutdown_event)
continue
except Exception:
# Surface any unexpected error and terminate the producer.
shutdown_event.set()
producer_thread = threading.Thread(target=producer, daemon=True)
producer_thread.start()
try:
while not shutdown_event.is_set():
try:
yield data_queue.get(block=True)
except Exception:
# If the producer already set the shutdown flag we exit.
if shutdown_event.is_set():
break
finally:
shutdown_event.set()
# Drain the queue quickly to help the thread exit if it's blocked on `put`.
while not data_queue.empty():
_ = data_queue.get_nowait()
# Give the producer thread a bit of time to finish.
producer_thread.join(timeout=1.0)