in petastorm/reader.py [0:0]
def _apply_predicate_to_row_groups(self, dataset, row_groups, predicate):
"""Filters the list of row group indexes using rowgroup selector object. Returns a modified list of rowgroup
indexes and a list of worker_predicate: predicates that could not be applied at this level
(parquet partitioning)."""
if predicate:
if not isinstance(predicate, PredicateBase):
raise ValueError('predicate parameter is expected to be derived from PredicateBase')
predicate_fields = predicate.get_fields()
if set(predicate_fields) == dataset.partitions.partition_names:
assert len(dataset.partitions.partition_names) == 1, \
'Datasets with only a single partition level supported at the moment'
filtered_row_group_indexes = []
for piece_index, piece in enumerate(row_groups):
partition_name, partition_index = piece.partition_keys[0]
partition_value = dataset.partitions[0].keys[partition_index]
# Convert partition value to correct type per the schema
partition_value = self.schema.fields[partition_name].numpy_dtype(partition_value)
if predicate.do_include({partition_name: partition_value}):
filtered_row_group_indexes.append(piece_index)
worker_predicate = None
else:
filtered_row_group_indexes = list(range(len(row_groups)))
worker_predicate = predicate
else:
filtered_row_group_indexes = list(range(len(row_groups)))
worker_predicate = None
return filtered_row_group_indexes, worker_predicate