def read_next()

in petastorm/py_dict_reader_worker.py [0:0]


    def read_next(self, workers_pool, schema, ngram):
        try:
            # We are receiving decoded rows from the worker in chunks. We store the list internally
            # and return a single item upon each consequent call to __next__
            with self._result_buffer_lock:
                if not self._result_buffer:
                    # Reverse order, so we can pop from the end of the list in O(1) while maintaining
                    # order the items are returned from the worker
                    list_of_rows = list(reversed(workers_pool.get_results()))

                    if ngram:
                        for ngram_row in list_of_rows:
                            for timestamp in ngram_row.keys():
                                row = ngram_row[timestamp]
                                schema_at_timestamp = ngram.get_schema_at_timestep(schema, timestamp)

                                ngram_row[timestamp] = schema_at_timestamp.make_namedtuple(**row)
                        self._result_buffer = list_of_rows
                    else:
                        self._result_buffer = [schema.make_namedtuple(**row) for row in list_of_rows]

                return self._result_buffer.pop()

        except EmptyResultError:
            raise StopIteration