in petastorm/arrow_reader_worker.py [0:0]
def _load_rows_with_predicate(self, pq_file, piece, worker_predicate, shuffle_row_drop_partition):
"""Loads all rows that match a predicate from a piece"""
# 1. Read all columns needed by predicate
# 2. Apply the predicate. If nothing matches, exit early
# 3. Read the remaining columns
# Split all column names into ones that are needed by predicateand the rest.
predicate_column_names = set(worker_predicate.get_fields())
if not predicate_column_names:
raise ValueError('At least one field name must be returned by predicate\'s get_field() method')
all_schema_names = set(field.name for field in self._schema.fields.values())
invalid_column_names = predicate_column_names - all_schema_names
if invalid_column_names:
raise ValueError('At least some column names requested by the predicate ({}) '
'are not valid schema names: ({})'.format(', '.join(invalid_column_names),
', '.join(all_schema_names)))
# Split into 'columns for predicate evaluation' and 'other columns'. We load 'other columns' only if at
# least one row in the rowgroup matched the predicate
other_column_names = all_schema_names - predicate_column_names
# Read columns needed for the predicate
predicates_table = self._read_with_shuffle_row_drop(piece, pq_file, predicate_column_names,
shuffle_row_drop_partition)
predicates_data_frame = predicates_table.to_pandas()
match_predicate_mask = worker_predicate.do_include(predicates_data_frame)
erase_mask = match_predicate_mask.map(operator.not_)
# Don't have anything left after filtering? Exit early.
if erase_mask.all():
return []
predicates_data_frame[erase_mask] = None
if other_column_names:
# Read remaining columns
other_table = self._read_with_shuffle_row_drop(piece, pq_file, other_column_names,
shuffle_row_drop_partition)
other_data_frame = other_table.to_pandas()
other_data_frame[erase_mask] = None
# Partition-by columns will appear in both other and predicate data frames. Deduplicate.
columns_from_predicates = predicates_data_frame.columns.difference(other_data_frame.columns)
result_data_frame = pd.merge(predicates_data_frame[columns_from_predicates], other_data_frame,
copy=False, left_index=True, right_index=True)
else:
result_data_frame = predicates_data_frame
result = result_data_frame[match_predicate_mask]
if self._transform_spec:
result = self._transform_spec.func(result)
return pa.Table.from_pandas(result, preserve_index=False)