def _ventilate()

in petastorm/workers_pool/ventilator.py [0:0]


    def _ventilate(self):
        # Randomize the item order before starting the ventilation if randomize_item_order is set
        if self._randomize_item_order:
            if self._random_seed is not None and self._random_seed != 0:
                # Deterministic randomization: use provided seed
                self._items_to_ventilate = list(self._rng.permutation(self._items_to_ventilate))
            else:
                # Non-deterministic randomization: use np.random
                self._items_to_ventilate = list(np.random.permutation(self._items_to_ventilate))

        while True:
            # Stop condition is when no iterations are remaining or there are no items to ventilate
            if self.completed():
                break

            # Block until queue has room, but use continue to allow for checking if stop has been called
            if self._ventilated_items_count - self._processed_items_count >= self._max_ventilation_queue_size:
                sleep(self._ventilation_interval)
                continue

            item_to_ventilate = self._items_to_ventilate[self._current_item_to_ventilate]
            self._ventilate_fn(**item_to_ventilate)
            self._current_item_to_ventilate += 1
            self._ventilated_items_count += 1

            if self._current_item_to_ventilate >= len(self._items_to_ventilate):
                self._current_item_to_ventilate = 0
                # If iterations was set to None, that means we will iterate until stop is called
                if self._iterations_remaining is not None:
                    self._iterations_remaining -= 1