def __init__()

in petastorm/workers_pool/ventilator.py [0:0]


    def __init__(self,
                 ventilate_fn,
                 items_to_ventilate,
                 iterations=1,
                 randomize_item_order=False,
                 max_ventilation_queue_size=None,
                 ventilation_interval=_VENTILATION_INTERVAL):
        """
        Constructor for a concurrent ventilator.

        :param ventilate_fn: The function to be called when ventilating. Usually the worker pool ventilate function.
        :param items_to_ventilate: (``list[dict]``) The list of items to ventilate. Each item is a ``dict`` denoting
                the ``**kwargs`` eventually passed to a worker process function
        :param iterations: (int) How many iterations through items_to_ventilate should be done and ventilated to the
                worker pool. For example if set to 2 each item in items_to_ventilate will be ventilated 2 times. If
                ``None`` is passed, the ventilator will continue ventilating forever.
        :param randomize_item_order: (``bool``) Whether to randomize the item order in items_to_ventilate. This will be
                done on every individual iteration.
        :param max_ventilation_queue_size: (``int``) The maximum number of items to be stored in the ventilation queue.
                The higher this number, the higher potential memory requirements. By default it will use the size
                of items_to_ventilate since that can definitely be held in memory.
        :param ventilation_interval: (``float`` in seconds) How much time passes between checks on whether something
                can be ventilated (when the ventilation queue is considered full).
        """
        super(ConcurrentVentilator, self).__init__(ventilate_fn)

        if iterations is not None and (not isinstance(iterations, int) or iterations < 1):
            raise ValueError('iterations must be positive integer or None')

        if not isinstance(items_to_ventilate, list) or any(not isinstance(item, dict) for item in items_to_ventilate):
            raise ValueError('items_to_ventilate must be a list of dicts')

        self._items_to_ventilate = items_to_ventilate
        self._iterations_remaining = iterations
        self._randomize_item_order = randomize_item_order

        self._iterations = iterations

        # For the default max ventilation queue size we will use the size of the items to ventilate
        self._max_ventilation_queue_size = max_ventilation_queue_size or len(items_to_ventilate)
        self._ventilation_interval = ventilation_interval

        self._current_item_to_ventilate = 0
        self._ventilation_thread = None
        self._ventilated_items_count = 0
        self._processed_items_count = 0
        self._stop_requested = False