in petastorm/arrow_reader_worker.py [0:0]
def read_next(self, workers_pool, schema, ngram):
try:
assert not ngram, 'ArrowReader does not support ngrams for now'
result_table = workers_pool.get_results()
# Convert arrow table columns into numpy. Strings are handled differently since to_pandas() returns
# numpy array of dtype=object.
result_dict = dict()
for column_name, column in compat_table_columns_gen(result_table):
# Assume we get only one chunk since reader worker reads one rowgroup at a time
# `to_pandas` works slower when called on the entire `data` rather directly on a chunk.
if compat_column_data(result_table.column(0)).num_chunks == 1:
column_as_pandas = column.data.chunks[0].to_pandas()
else:
column_as_pandas = column.data.to_pandas()
# pyarrow < 0.15.0 would always return a numpy array. Starting 0.15 we get pandas series, hence we
# convert it into numpy array
if isinstance(column_as_pandas, pd.Series):
column_as_numpy = column_as_pandas.values
else:
column_as_numpy = column_as_pandas
if pa.types.is_string(column.type):
result_dict[column_name] = column_as_numpy.astype(np.unicode_)
elif pa.types.is_list(column.type):
# Assuming all lists are of the same length, hence we can collate them into a matrix
list_of_lists = column_as_numpy
try:
col_data = np.vstack(list_of_lists.tolist())
shape = schema.fields[column_name].shape
if len(shape) > 1:
col_data = col_data.reshape((len(list_of_lists),) + shape)
result_dict[column_name] = col_data
except ValueError:
raise RuntimeError('Length of all values in column \'{}\' are expected to be the same length. '
'Got the following set of lengths: \'{}\''
.format(column_name,
', '.join(str(value.shape[0]) for value in list_of_lists)))
else:
result_dict[column_name] = column_as_numpy
return schema.make_namedtuple(**result_dict)
except EmptyResultError:
raise StopIteration