in petastorm/workers_pool/thread_pool.py [0:0]
def get_results(self):
"""Returns results from worker pool or re-raise worker's exception if any happen in worker thread.
:param timeout: If None, will block forever, otherwise will raise :class:`.TimeoutWaitingForResultError`
exception if no data received within the timeout (in seconds)
:return: arguments passed to ``publish_func(...)`` by a worker. If no more results are anticipated,
:class:`.EmptyResultError`.
"""
while True:
# If there is no more work to do, raise an EmptyResultError
if self._results_queue.empty() and self._ventilated_items == self._ventilated_items_processed:
# We also need to check if we are using a ventilator and if it is completed
if not self._ventilator or self._ventilator.completed():
raise EmptyResultError()
try:
result = self._results_queue.get(timeout=_VERIFY_END_OF_VENTILATION_PERIOD)
if isinstance(result, VentilatedItemProcessedMessage):
self._ventilated_items_processed += 1
if self._ventilator:
self._ventilator.processed_item()
continue
elif isinstance(result, Exception):
self.stop()
self.join()
raise result
else:
return result
except queue.Empty:
continue