in petastorm/workers_pool/ventilator.py [0:0]
def _ventilate(self):
# Randomize the item order before starting the ventilation if randomize_item_order is set
if self._randomize_item_order:
if self._random_seed is not None and self._random_seed != 0:
# Deterministic randomization: use provided seed
self._items_to_ventilate = list(self._rng.permutation(self._items_to_ventilate))
else:
# Non-deterministic randomization: use np.random
self._items_to_ventilate = list(np.random.permutation(self._items_to_ventilate))
while True:
# Stop condition is when no iterations are remaining or there are no items to ventilate
if self.completed():
break
# Block until queue has room, but use continue to allow for checking if stop has been called
if self._ventilated_items_count - self._processed_items_count >= self._max_ventilation_queue_size:
sleep(self._ventilation_interval)
continue
item_to_ventilate = self._items_to_ventilate[self._current_item_to_ventilate]
self._ventilate_fn(**item_to_ventilate)
self._current_item_to_ventilate += 1
self._ventilated_items_count += 1
if self._current_item_to_ventilate >= len(self._items_to_ventilate):
self._current_item_to_ventilate = 0
# If iterations was set to None, that means we will iterate until stop is called
if self._iterations_remaining is not None:
self._iterations_remaining -= 1