def start()

in petastorm/workers_pool/thread_pool.py [0:0]


    def start(self, worker_class, worker_args=None, ventilator=None):
        """Starts worker threads.

        :param worker_class: A class of the worker class. The class will be instantiated in the worker process. The
          class must implement :class:`.WorkerBase` protocol
        :param worker_setup_args: Argument that will be passed to ``args`` property of the instantiated
          :class:`.WorkerBase`
        :return: ``None``
        """
        # Verify stop_event and raise exception if it's already set!
        if self._stop_event.is_set():
            raise RuntimeError('ThreadPool({}) cannot be reused! stop_event set? {}'
                               .format(len(self._workers), self._stop_event.is_set()))

        # Set up a channel for each worker to send work
        self._ventilator_queues = [queue.Queue() for _ in range(self.workers_count)]
        # Set up a channel for each worker to send results
        self._results_queues = [
            queue.Queue(max(5, self._results_queue_size // self.workers_count))
            for _ in range(self.workers_count)
        ]
        self._workers = []
        for worker_id in range(self.workers_count):
            # Create a closure that captures the worker_id for this specific worker
            def make_publish_func(worker_id):
                return lambda data: self._stop_aware_put(worker_id, data)

            worker_impl = worker_class(worker_id, make_publish_func(worker_id), worker_args)
            new_thread = WorkerThread(worker_impl, self._stop_event, self._ventilator_queues[worker_id],
                                      self._results_queues[worker_id], self._profiling_enabled)
            # Make the thread daemonic. Since it only reads it's ok to abort while running - no resource corruption
            # will occur.
            new_thread.daemon = True
            self._workers.append(new_thread)

        # Spin up all worker threads
        for w in self._workers:
            w.start()

        if ventilator:
            self._ventilator = ventilator
            self._ventilator.start()