in petastorm/arrow_reader_worker.py [0:0]
def _load_rows(self, pq_file, piece, shuffle_row_drop_range):
"""Loads all rows from a piece"""
column_names_in_schema = set(field.name for field in self._schema.fields.values())
result = self._read_with_shuffle_row_drop(piece, pq_file, column_names_in_schema, shuffle_row_drop_range)
if self._transform_spec:
result_as_pandas = result.to_pandas()
# A user may omit `func` value if they intend just to delete some fields using the TransformSpec
if self._transform_spec.func:
transformed_result = self._transform_spec.func(result_as_pandas)
else:
transformed_result = result_as_pandas
# If transform function left a field that is listed in transform_spec's remove_fields, we remove it
# ourselves. Allows for the following transform-spec objects to be created:
# TransformSpec(removed_fields=['some field'])
for field_to_remove in set(transformed_result.columns) & set(self._transform_spec.removed_fields):
del transformed_result[field_to_remove]
transformed_result_column_set = set(transformed_result.columns)
transformed_schema_column_set = set([f.name for f in self._transformed_schema.fields.values()])
if transformed_result_column_set != transformed_schema_column_set:
raise ValueError('Transformed result columns ({rc}) do not match required schema columns({sc})'
.format(rc=','.join(transformed_result_column_set),
sc=','.join(transformed_schema_column_set)))
# For fields return multidimensional array, we need to ravel them
# because pyarrow do not support multidimensional array.
# later we will reshape it back.
for field in self._transformed_schema.fields.values():
if len(field.shape) > 1:
transformed_result[field.name] = transformed_result[field.name] \
.map(lambda x, f=field: self._check_shape_and_ravel(x, f))
result = pa.Table.from_pandas(transformed_result, preserve_index=False)
# If convert_early_to_numpy is enabled, convert to numpy dict here
if self._convert_early_to_numpy:
schema_to_use = self._transformed_schema if self._transform_spec else self._schema
return convert_arrow_table_to_numpy_dict(result, schema_to_use)
else:
return result