in petastorm/workers_pool/thread_pool.py [0:0]
def get_results(self):
"""Returns results from worker pool or re-raise worker's exception if any happen in worker thread.
:param timeout: If None, will block forever, otherwise will raise :class:`.TimeoutWaitingForResultError`
exception if no data received within the timeout (in seconds)
:return: arguments passed to ``publish_func(...)`` by a worker. If no more results are anticipated,
:class:`.EmptyResultError`.
"""
# If shuffle_rows is enabled and the seed is not set, we need to use a non-blocking
# as we don't care about the strict round robin order
use_non_blocking_get = self._shuffle_rows and (self._seed is None or self._seed == 0)
while True:
# If there is no more work to do, raise an EmptyResultError
if self.all_workers_done():
# We also need to check if we are using a ventilator and if it is completed
if not self._ventilator or self._ventilator.completed():
raise EmptyResultError()
# If the current worker is done, we need to get the result from the next worker
if self.current_worker_done(self._get_results_worker_id):
self._get_results_worker_id = (self._get_results_worker_id + 1) % self.workers_count
continue
try:
# Get the result from the current worker's results queue.
# Use blocking/strict round robin if shuffle_rows is disabled or the seed is set
result = self._results_queues[self._get_results_worker_id].get(
block=not use_non_blocking_get, timeout=_VERIFY_END_OF_VENTILATION_PERIOD
)
# If the result is a VentilatedItemProcessedMessage, we need to increment the count of items
# processed by the current worker
if isinstance(result, VentilatedItemProcessedMessage):
self._ventilated_items_processed_by_worker[self._get_results_worker_id] += 1
if self._ventilator:
self._ventilator.processed_item()
# Move to the next worker
self._get_results_worker_id = (self._get_results_worker_id + 1) % self.workers_count
continue
elif isinstance(result, Exception):
self.stop()
self.join()
raise result
else:
return result
except queue.Empty:
continue