in petastorm/workers_pool/ventilator.py [0:0]
def _ventilate(self):
while True:
# Stop condition is when no iterations are remaining or there are no items to ventilate
if self.completed():
break
# If we are ventilating the first item, we check if we would like to randomize the item order
if self._current_item_to_ventilate == 0 and self._randomize_item_order:
random.shuffle(self._items_to_ventilate)
# Block until queue has room, but use continue to allow for checking if stop has been called
if self._ventilated_items_count - self._processed_items_count >= self._max_ventilation_queue_size:
sleep(self._ventilation_interval)
continue
item_to_ventilate = self._items_to_ventilate[self._current_item_to_ventilate]
self._ventilate_fn(**item_to_ventilate)
self._current_item_to_ventilate += 1
self._ventilated_items_count += 1
if self._current_item_to_ventilate >= len(self._items_to_ventilate):
self._current_item_to_ventilate = 0
# If iterations was set to None, that means we will iterate until stop is called
if self._iterations_remaining is not None:
self._iterations_remaining -= 1