def get_results()

in petastorm/workers_pool/process_pool.py [0:0]


    def get_results(self):
        """Returns results from worker pool

        :param timeout: If None, will block forever, otherwise will raise :class:`.TimeoutWaitingForResultError`
            exception if no data received within the timeout (in seconds)
        :return: arguments passed to ``publish_func(...)`` by a worker. If no more results are anticipated,
            :class:`.EmptyResultError` is raised.
        """

        while True:
            # If there is no more work to do, raise an EmptyResultError
            logger.debug('ventilated_items=%d ventilated_items_processed=%d ventilator.completed=%s',
                         self._ventilated_items, self._ventilated_items_processed,
                         str(self._ventilator.completed()) if self._ventilator else 'N/A')
            if self._ventilated_items == self._ventilated_items_processed:
                # We also need to check if we are using a ventilator and if it is completed
                if not self._ventilator or self._ventilator.completed():
                    logger.debug('ventilator reported it has completed. Reporting end of results')
                    raise EmptyResultError()

            logger.debug('get_results polling on the next result')
            socks = self._results_receiver_poller.poll(_VERIFY_END_OF_VENTILATION_PERIOD * 1e3)
            if not socks:
                continue
            # Result message is a tuple containing data payload and possible exception (or None).
            # By specifying pyarrow_serialize=True, we may choose to use pyarrow serializer which is faster, but
            # does not support all data types correctly.
            fast_serialized, pickle_serialized = self._results_receiver.recv_multipart(copy=self._zmq_copy_buffers)
            pickle_serialized = pickle.loads(pickle_serialized)

            if pickle_serialized:
                logger.debug('get_results a pickled message %s', type(pickle_serialized))
                if isinstance(pickle_serialized, VentilatedItemProcessedMessage):
                    self._ventilated_items_processed += 1
                    if self._ventilator:
                        self._ventilator.processed_item()
                elif isinstance(pickle_serialized, Exception):
                    self.stop()
                    self.join()
                    raise pickle_serialized
            else:
                logger.debug('get_results received new results')
                if self._zmq_copy_buffers:
                    deserialized_result = self._serializer.deserialize(fast_serialized)
                else:
                    deserialized_result = self._serializer.deserialize(fast_serialized.buffer)
                return deserialized_result