def read_next()

in petastorm/arrow_reader_worker.py [0:0]


    def read_next(self, workers_pool, schema, ngram):
        try:
            assert not ngram, 'ArrowReader does not support ngrams for now'

            result_table = workers_pool.get_results()

            # Convert arrow table columns into numpy. Strings are handled differently since to_pandas() returns
            # numpy array of dtype=object.
            result_dict = dict()
            for column_name, column in compat_table_columns_gen(result_table):
                # Assume we get only one chunk since reader worker reads one rowgroup at a time

                # `to_pandas` works slower when called on the entire `data` rather directly on a chunk.
                if compat_column_data(result_table.column(0)).num_chunks == 1:
                    column_as_pandas = column.data.chunks[0].to_pandas()
                else:
                    column_as_pandas = column.data.to_pandas()

                # pyarrow < 0.15.0 would always return a numpy array. Starting 0.15 we get pandas series, hence we
                # convert it into numpy array
                if isinstance(column_as_pandas, pd.Series):
                    column_as_numpy = column_as_pandas.values
                else:
                    column_as_numpy = column_as_pandas

                if pa.types.is_string(column.type):
                    result_dict[column_name] = column_as_numpy.astype(np.unicode_)
                elif pa.types.is_list(column.type):
                    # Assuming all lists are of the same length, hence we can collate them into a matrix
                    list_of_lists = column_as_numpy
                    try:
                        col_data = np.vstack(list_of_lists.tolist())
                        shape = schema.fields[column_name].shape
                        if len(shape) > 1:
                            col_data = col_data.reshape((len(list_of_lists),) + shape)
                        result_dict[column_name] = col_data

                    except ValueError:
                        raise RuntimeError('Length of all values in column \'{}\' are expected to be the same length. '
                                           'Got the following set of lengths: \'{}\''
                                           .format(column_name,
                                                   ', '.join(str(value.shape[0]) for value in list_of_lists)))
                else:
                    result_dict[column_name] = column_as_numpy

            return schema.make_namedtuple(**result_dict)

        except EmptyResultError:
            raise StopIteration