def run()

in petastorm/workers_pool/thread_pool.py [0:0]


    def run(self):
        if self._profiling_enabled:
            self.prof.enable()
        # Loop and accept messages from both channels, acting accordingly
        while True:
            # Check for stop event first to prevent erroneous reuse
            if self._stop_event.is_set():
                break
            # If the message came from work_receiver channel
            try:
                (args, kargs) = self._ventilator_queue.get(block=True, timeout=IO_TIMEOUT_INTERVAL_S)
                self._worker_impl.process(*args, **kargs)
                self._worker_impl.publish_func(VentilatedItemProcessedMessage())
            except queue.Empty:
                pass
            except WorkerTerminationRequested:
                pass
            except Exception as e:  # pylint: disable=broad-except
                stderr_message = 'Worker %d terminated: unexpected exception:\n' % self._worker_impl.worker_id
                stderr_message += format_exc()
                sys.stderr.write(stderr_message)
                self._results_queue.put(e)
                break
        if self._profiling_enabled:
            self.prof.disable()