in petastorm/arrow_reader_worker.py [0:0]
def _load_rows(self, pq_file, piece, shuffle_row_drop_range):
"""Loads all rows from a piece"""
column_names_in_schema = set(field.name for field in self._schema.fields.values())
result = self._read_with_shuffle_row_drop(piece, pq_file, column_names_in_schema, shuffle_row_drop_range)
if self._transform_spec:
result_as_pandas = result.to_pandas()
# A user may omit `func` value if they intend just to delete some fields using the TransformSpec
if self._transform_spec.func:
transformed_result = self._transform_spec.func(result_as_pandas)
else:
transformed_result = result_as_pandas
# If transform function left a field that is listed in transform_spec's remove_fields, we remove it
# ourselves. Allows for the following transform-spec objects to be created:
# TransformSpec(removed_fields=['some field'])
for field_to_remove in set(transformed_result.columns) & set(self._transform_spec.removed_fields):
del transformed_result[field_to_remove]
transformed_result_column_set = set(transformed_result.columns)
transformed_schema_column_set = set([f.name for f in self._transformed_schema.fields.values()])
if transformed_result_column_set != transformed_schema_column_set:
raise ValueError('Transformed result columns ({rc}) do not match required schema columns({sc})'
.format(rc=','.join(transformed_result_column_set),
sc=','.join(transformed_schema_column_set)))
# For fields return multidimensional array, we need to ravel them
# because pyarrow do not support multidimensional array.
# later we will reshape it back.
for field in self._transformed_schema.fields.values():
if len(field.shape) > 1:
transformed_result[field.name] = transformed_result[field.name] \
.map(lambda x, f=field: self._check_shape_and_ravel(x, f))
result = pa.Table.from_pandas(transformed_result, preserve_index=False)
return result