in petastorm/workers_pool/thread_pool.py [0:0]
def start(self, worker_class, worker_args=None, ventilator=None):
"""Starts worker threads.
:param worker_class: A class of the worker class. The class will be instantiated in the worker process. The
class must implement :class:`.WorkerBase` protocol
:param worker_setup_args: Argument that will be passed to ``args`` property of the instantiated
:class:`.WorkerBase`
:return: ``None``
"""
# Verify stop_event and raise exception if it's already set!
if self._stop_event.is_set():
raise RuntimeError('ThreadPool({}) cannot be reused! stop_event set? {}'
.format(len(self._workers), self._stop_event.is_set()))
# Set up a channel for each worker to send work
self._ventilator_queues = [queue.Queue() for _ in range(self.workers_count)]
# Set up a channel for each worker to send results
self._results_queues = [
queue.Queue(max(5, self._results_queue_size // self.workers_count))
for _ in range(self.workers_count)
]
self._workers = []
for worker_id in range(self.workers_count):
# Create a closure that captures the worker_id for this specific worker
def make_publish_func(worker_id):
return lambda data: self._stop_aware_put(worker_id, data)
worker_impl = worker_class(worker_id, make_publish_func(worker_id), worker_args)
new_thread = WorkerThread(worker_impl, self._stop_event, self._ventilator_queues[worker_id],
self._results_queues[worker_id], self._profiling_enabled)
# Make the thread daemonic. Since it only reads it's ok to abort while running - no resource corruption
# will occur.
new_thread.daemon = True
self._workers.append(new_thread)
# Spin up all worker threads
for w in self._workers:
w.start()
if ventilator:
self._ventilator = ventilator
self._ventilator.start()