in petastorm/reader_impl/pytorch_shuffling_buffer.py [0:0]
def _add_many(self, items):
if self._done_adding:
raise RuntimeError('Can not call add_many after done_adding() was called.')
if not self.can_add():
raise RuntimeError('Can not enqueue. Check the return value of "can_enqueue()" to check if more '
'items can be added.')
expected_size = self._size + len(items[0])
maximal_capacity = self._shuffling_queue_capacity + self._extra_capacity
if expected_size > maximal_capacity:
raise RuntimeError('Attempt to enqueue more elements than the capacity allows. '
'Current size: {}, new size {}, maximum allowed: {}'.format(self._size, expected_size,
maximal_capacity))
new_capacity = self._shuffling_queue_capacity
while new_capacity < expected_size:
# Will double capacity until it is large enough to fit new batch
new_capacity *= 2
if self._items is None:
# Create Buffer:
self._items = []
for v in items:
self._items.append(torch.empty((new_capacity,) + v.shape[1:], dtype=v.dtype, device=v.device))
if self.next_sample_head > 0:
# Before we can append a new batch, we compress the remaining samples
for k, v in enumerate(self._items):
# We need to clone the right-side to avoid racing conditions
self._items[k][:self.size] = self._items[k][self._random_indices[self.next_sample_head:]].clone()
self._random_indices = None
self.next_sample_head = 0
if new_capacity > self._items[0].shape[0]:
for k, v in enumerate(self._items):
self._items[k] = torch.empty((new_capacity,) + v.shape[1:], dtype=v.dtype, device=v.device)
self._items[k][:self._size] = v[:self._size]
# Copy new items over
for k, v in enumerate(items):
self._items[k][self._size:expected_size] = v
self._size = expected_size