in petastorm/workers_pool/ventilator.py [0:0]
def __init__(self,
ventilate_fn,
items_to_ventilate,
iterations=1,
randomize_item_order=False,
max_ventilation_queue_size=None,
ventilation_interval=_VENTILATION_INTERVAL):
"""
Constructor for a concurrent ventilator.
:param ventilate_fn: The function to be called when ventilating. Usually the worker pool ventilate function.
:param items_to_ventilate: (``list[dict]``) The list of items to ventilate. Each item is a ``dict`` denoting
the ``**kwargs`` eventually passed to a worker process function
:param iterations: (int) How many iterations through items_to_ventilate should be done and ventilated to the
worker pool. For example if set to 2 each item in items_to_ventilate will be ventilated 2 times. If
``None`` is passed, the ventilator will continue ventilating forever.
:param randomize_item_order: (``bool``) Whether to randomize the item order in items_to_ventilate. This will be
done on every individual iteration.
:param max_ventilation_queue_size: (``int``) The maximum number of items to be stored in the ventilation queue.
The higher this number, the higher potential memory requirements. By default it will use the size
of items_to_ventilate since that can definitely be held in memory.
:param ventilation_interval: (``float`` in seconds) How much time passes between checks on whether something
can be ventilated (when the ventilation queue is considered full).
"""
super(ConcurrentVentilator, self).__init__(ventilate_fn)
if iterations is not None and (not isinstance(iterations, int) or iterations < 1):
raise ValueError('iterations must be positive integer or None')
if not isinstance(items_to_ventilate, list) or any(not isinstance(item, dict) for item in items_to_ventilate):
raise ValueError('items_to_ventilate must be a list of dicts')
self._items_to_ventilate = items_to_ventilate
self._iterations_remaining = iterations
self._randomize_item_order = randomize_item_order
self._iterations = iterations
# For the default max ventilation queue size we will use the size of the items to ventilate
self._max_ventilation_queue_size = max_ventilation_queue_size or len(items_to_ventilate)
self._ventilation_interval = ventilation_interval
self._current_item_to_ventilate = 0
self._ventilation_thread = None
self._ventilated_items_count = 0
self._processed_items_count = 0
self._stop_requested = False