in petastorm/workers_pool/dummy_pool.py [0:0]
def get_results(self):
"""Returns results
The processing is done on the get_results caller thread if the results queue is empty
:return: arguments passed to publish_func(...) by a worker
"""
if self._results_queue:
# We have already calculated result. Just return it
return self._results_queue.pop(0)
else:
# If we don't have any tasks waiting for processing, then indicate empty queue
while self._ventilator_queue or (self._ventilator and not self._ventilator.completed()):
# To prevent a race condition of the ventilator working but not yet placing an item
# on the ventilator queue. We block until something is on the ventilator queue.
while not self._ventilator_queue:
sleep(.1)
# If we do have some tasks, then process a task from the head of a queue
args, kargs = self._ventilator_queue.pop(0)
self._worker.process(*args, **kargs)
if self._ventilator:
self._ventilator.processed_item()
if self._results_queue:
return self._results_queue.pop(0)
raise EmptyResultError()