in petastorm/workers_pool/process_pool.py [0:0]
def get_results(self):
"""Returns results from worker pool
:param timeout: If None, will block forever, otherwise will raise :class:`.TimeoutWaitingForResultError`
exception if no data received within the timeout (in seconds)
:return: arguments passed to ``publish_func(...)`` by a worker. If no more results are anticipated,
:class:`.EmptyResultError` is raised.
"""
while True:
# If there is no more work to do, raise an EmptyResultError
logger.debug('ventilated_items=%d ventilated_items_processed=%d ventilator.completed=%s',
self._ventilated_items, self._ventilated_items_processed,
str(self._ventilator.completed()) if self._ventilator else 'N/A')
if self._ventilated_items == self._ventilated_items_processed:
# We also need to check if we are using a ventilator and if it is completed
if not self._ventilator or self._ventilator.completed():
logger.debug('ventilator reported it has completed. Reporting end of results')
raise EmptyResultError()
logger.debug('get_results polling on the next result')
socks = self._results_receiver_poller.poll(_VERIFY_END_OF_VENTILATION_PERIOD * 1e3)
if not socks:
continue
# Result message is a tuple containing data payload and possible exception (or None).
# By specifying pyarrow_serialize=True, we may choose to use pyarrow serializer which is faster, but
# does not support all data types correctly.
fast_serialized, pickle_serialized = self._results_receiver.recv_multipart(copy=self._zmq_copy_buffers)
pickle_serialized = pickle.loads(pickle_serialized)
if pickle_serialized:
logger.debug('get_results a pickled message %s', type(pickle_serialized))
if isinstance(pickle_serialized, VentilatedItemProcessedMessage):
self._ventilated_items_processed += 1
if self._ventilator:
self._ventilator.processed_item()
elif isinstance(pickle_serialized, Exception):
self.stop()
self.join()
raise pickle_serialized
else:
logger.debug('get_results received new results')
if self._zmq_copy_buffers:
deserialized_result = self._serializer.deserialize(fast_serialized)
else:
deserialized_result = self._serializer.deserialize(fast_serialized.buffer)
return deserialized_result