in petastorm/workers_pool/thread_pool.py [0:0]
def run(self):
if self._profiling_enabled:
self.prof.enable()
# Loop and accept messages from both channels, acting accordingly
while True:
# Check for stop event first to prevent erroneous reuse
if self._stop_event.is_set():
break
# If the message came from work_receiver channel
try:
(args, kargs) = self._ventilator_queue.get(block=True, timeout=IO_TIMEOUT_INTERVAL_S)
self._worker_impl.process(*args, **kargs)
self._worker_impl.publish_func(VentilatedItemProcessedMessage())
except queue.Empty:
pass
except WorkerTerminationRequested:
pass
except Exception as e: # pylint: disable=broad-except
stderr_message = 'Worker %d terminated: unexpected exception:\n' % self._worker_impl.worker_id
stderr_message += format_exc()
sys.stderr.write(stderr_message)
self._results_queue.put(e)
break
if self._profiling_enabled:
self.prof.disable()