def get_results()

in petastorm/workers_pool/thread_pool.py [0:0]


    def get_results(self):
        """Returns results from worker pool or re-raise worker's exception if any happen in worker thread.

        :param timeout: If None, will block forever, otherwise will raise :class:`.TimeoutWaitingForResultError`
            exception if no data received within the timeout (in seconds)

        :return: arguments passed to ``publish_func(...)`` by a worker. If no more results are anticipated,
                 :class:`.EmptyResultError`.
        """

        while True:
            # If there is no more work to do, raise an EmptyResultError
            if self._results_queue.empty() and self._ventilated_items == self._ventilated_items_processed:
                # We also need to check if we are using a ventilator and if it is completed
                if not self._ventilator or self._ventilator.completed():
                    raise EmptyResultError()

            try:
                result = self._results_queue.get(timeout=_VERIFY_END_OF_VENTILATION_PERIOD)
                if isinstance(result, VentilatedItemProcessedMessage):
                    self._ventilated_items_processed += 1
                    if self._ventilator:
                        self._ventilator.processed_item()
                    continue
                elif isinstance(result, Exception):
                    self.stop()
                    self.join()
                    raise result
                else:
                    return result
            except queue.Empty:
                continue