def _load_rows_with_predicate()

in petastorm/arrow_reader_worker.py [0:0]


    def _load_rows_with_predicate(self, pq_file, piece, worker_predicate, shuffle_row_drop_partition):
        """Loads all rows that match a predicate from a piece"""

        # 1. Read all columns needed by predicate
        # 2. Apply the predicate. If nothing matches, exit early
        # 3. Read the remaining columns

        # Split all column names into ones that are needed by predicateand the rest.
        predicate_column_names = set(worker_predicate.get_fields())

        if not predicate_column_names:
            raise ValueError('At least one field name must be returned by predicate\'s get_field() method')

        all_schema_names = set(field.name for field in self._schema.fields.values())

        invalid_column_names = predicate_column_names - all_schema_names
        if invalid_column_names:
            raise ValueError('At least some column names requested by the predicate ({}) '
                             'are not valid schema names: ({})'.format(', '.join(invalid_column_names),
                                                                       ', '.join(all_schema_names)))

        # Split into 'columns for predicate evaluation' and 'other columns'. We load 'other columns' only if at
        # least one row in the rowgroup matched the predicate
        other_column_names = all_schema_names - predicate_column_names

        # Read columns needed for the predicate
        predicates_table = self._read_with_shuffle_row_drop(piece, pq_file, predicate_column_names,
                                                            shuffle_row_drop_partition)

        predicates_data_frame = predicates_table.to_pandas()

        match_predicate_mask = worker_predicate.do_include(predicates_data_frame)
        erase_mask = match_predicate_mask.map(operator.not_)

        # Don't have anything left after filtering? Exit early.
        if erase_mask.all():
            return []

        predicates_data_frame[erase_mask] = None

        if other_column_names:
            # Read remaining columns
            other_table = self._read_with_shuffle_row_drop(piece, pq_file, other_column_names,
                                                           shuffle_row_drop_partition)
            other_data_frame = other_table.to_pandas()
            other_data_frame[erase_mask] = None

            # Partition-by columns will appear in both other and predicate data frames. Deduplicate.
            columns_from_predicates = predicates_data_frame.columns.difference(other_data_frame.columns)
            result_data_frame = pd.merge(predicates_data_frame[columns_from_predicates], other_data_frame,
                                         copy=False, left_index=True, right_index=True)
        else:
            result_data_frame = predicates_data_frame

        result = result_data_frame[match_predicate_mask]

        if self._transform_spec:
            result = self._transform_spec.func(result)

        return pa.Table.from_pandas(result, preserve_index=False)