in src/agents/tracing/processors.py [0:0]
def _export_batches(self, force: bool = False):
"""Drains the queue and exports in batches. If force=True, export everything.
Otherwise, export up to `max_batch_size` repeatedly until the queue is empty or below a
certain threshold.
"""
while True:
items_to_export: list[Span[Any] | Trace] = []
# Gather a batch of spans up to max_batch_size
while not self._queue.empty() and (
force or len(items_to_export) < self._max_batch_size
):
try:
items_to_export.append(self._queue.get_nowait())
except queue.Empty:
# Another thread might have emptied the queue between checks
break
# If we collected nothing, we're done
if not items_to_export:
break
# Export the batch
self._exporter.export(items_to_export)