def _ventilate()

in petastorm/workers_pool/ventilator.py [0:0]


    def _ventilate(self):
        while True:
            # Stop condition is when no iterations are remaining or there are no items to ventilate
            if self.completed():
                break

            # If we are ventilating the first item, we check if we would like to randomize the item order
            if self._current_item_to_ventilate == 0 and self._randomize_item_order:
                random.shuffle(self._items_to_ventilate)

            # Block until queue has room, but use continue to allow for checking if stop has been called
            if self._ventilated_items_count - self._processed_items_count >= self._max_ventilation_queue_size:
                sleep(self._ventilation_interval)
                continue

            item_to_ventilate = self._items_to_ventilate[self._current_item_to_ventilate]
            self._ventilate_fn(**item_to_ventilate)
            self._current_item_to_ventilate += 1
            self._ventilated_items_count += 1

            if self._current_item_to_ventilate >= len(self._items_to_ventilate):
                self._current_item_to_ventilate = 0
                # If iterations was set to None, that means we will iterate until stop is called
                if self._iterations_remaining is not None:
                    self._iterations_remaining -= 1