def _load_rows()

in petastorm/arrow_reader_worker.py [0:0]


    def _load_rows(self, pq_file, piece, shuffle_row_drop_range):
        """Loads all rows from a piece"""

        column_names_in_schema = set(field.name for field in self._schema.fields.values())

        result = self._read_with_shuffle_row_drop(piece, pq_file, column_names_in_schema, shuffle_row_drop_range)

        if self._transform_spec:
            result_as_pandas = result.to_pandas()
            # A user may omit `func` value if they intend just to delete some fields using the TransformSpec
            if self._transform_spec.func:
                transformed_result = self._transform_spec.func(result_as_pandas)
            else:
                transformed_result = result_as_pandas

            # If transform function left a field that is listed in transform_spec's remove_fields, we remove it
            # ourselves. Allows for the following transform-spec objects to be created:
            # TransformSpec(removed_fields=['some field'])
            for field_to_remove in set(transformed_result.columns) & set(self._transform_spec.removed_fields):
                del transformed_result[field_to_remove]

            transformed_result_column_set = set(transformed_result.columns)
            transformed_schema_column_set = set([f.name for f in self._transformed_schema.fields.values()])

            if transformed_result_column_set != transformed_schema_column_set:
                raise ValueError('Transformed result columns ({rc}) do not match required schema columns({sc})'
                                 .format(rc=','.join(transformed_result_column_set),
                                         sc=','.join(transformed_schema_column_set)))

            # For fields return multidimensional array, we need to ravel them
            # because pyarrow do not support multidimensional array.
            # later we will reshape it back.
            for field in self._transformed_schema.fields.values():
                if len(field.shape) > 1:
                    transformed_result[field.name] = transformed_result[field.name] \
                        .map(lambda x, f=field: self._check_shape_and_ravel(x, f))

            result = pa.Table.from_pandas(transformed_result, preserve_index=False)

        return result