def get_results()

in petastorm/workers_pool/thread_pool.py [0:0]


    def get_results(self):
        """Returns results from worker pool or re-raise worker's exception if any happen in worker thread.

        :param timeout: If None, will block forever, otherwise will raise :class:`.TimeoutWaitingForResultError`
            exception if no data received within the timeout (in seconds)

        :return: arguments passed to ``publish_func(...)`` by a worker. If no more results are anticipated,
                 :class:`.EmptyResultError`.
        """
        # If shuffle_rows is enabled and the seed is not set, we need to use a non-blocking
        #  as we don't care about the strict round robin order
        use_non_blocking_get = self._shuffle_rows and (self._seed is None or self._seed == 0)
        while True:
            # If there is no more work to do, raise an EmptyResultError
            if self.all_workers_done():
                # We also need to check if we are using a ventilator and if it is completed
                if not self._ventilator or self._ventilator.completed():
                    raise EmptyResultError()

            # If the current worker is done, we need to get the result from the next worker
            if self.current_worker_done(self._get_results_worker_id):
                self._get_results_worker_id = (self._get_results_worker_id + 1) % self.workers_count
                continue

            try:
                # Get the result from the current worker's results queue.
                # Use blocking/strict round robin if shuffle_rows is disabled or the seed is set
                result = self._results_queues[self._get_results_worker_id].get(
                    block=not use_non_blocking_get, timeout=_VERIFY_END_OF_VENTILATION_PERIOD
                )
                # If the result is a VentilatedItemProcessedMessage, we need to increment the count of items
                # processed by the current worker
                if isinstance(result, VentilatedItemProcessedMessage):
                    self._ventilated_items_processed_by_worker[self._get_results_worker_id] += 1
                    if self._ventilator:
                        self._ventilator.processed_item()
                    # Move to the next worker
                    self._get_results_worker_id = (self._get_results_worker_id + 1) % self.workers_count
                    continue
                elif isinstance(result, Exception):
                    self.stop()
                    self.join()
                    raise result
                else:
                    return result
            except queue.Empty:
                continue