in petastorm/arrow_reader_worker.py [0:0]
def process(self, piece_index, worker_predicate, shuffle_row_drop_partition):
"""Main worker function. Loads and returns all rows matching the predicate from a rowgroup
Looks up the requested piece (a single row-group in a parquet file). If a predicate is specified,
columns needed by the predicate are loaded first. If no rows in the rowgroup matches the predicate criteria
the rest of the columns are not loaded.
:param piece_index:
:param shuffle_row_drop_partition: A tuple 2 of the current row drop partition and the total number
of partitions.
:return:
"""
if not self._dataset:
self._dataset = pq.ParquetDataset(
self._dataset_path_or_paths,
filesystem=self._filesystem,
validate_schema=False, filters=self._arrow_filters)
if self._dataset.partitions is None:
# When read from parquet file list, the `dataset.partitions` will be None.
# But other petastorm code require at least an empty `ParquetPartitions` object.
self._dataset.partitions = pq.ParquetPartitions()
piece = self._split_pieces[piece_index]
# Create pyarrow file system
parquet_file = ParquetFile(self._dataset.fs.open(piece.path))
if not isinstance(self._local_cache, NullCache):
if worker_predicate:
raise RuntimeError('Local cache is not supported together with predicates, '
'unless the dataset is partitioned by the column the predicate operates on.')
if shuffle_row_drop_partition[1] != 1:
raise RuntimeError('Local cache is not supported together with shuffle_row_drop_partitions > 1')
if worker_predicate:
all_cols = self._load_rows_with_predicate(parquet_file, piece, worker_predicate, shuffle_row_drop_partition)
else:
# Using hash of the dataset path with the relative path in order to:
# 1. Make sure if a common cache serves multiple processes (e.g. redis), we don't have conflicts
# 2. Dataset path is hashed, to make sure we don't create too long keys, which maybe incompatible with
# some cache implementations
# 3. Still leave relative path and the piece_index in plain text to make it easier to debug
if isinstance(self._dataset_path_or_paths, list):
path_str = ','.join(self._dataset_path_or_paths)
else:
path_str = self._dataset_path_or_paths
cache_key = '{}:{}:{}'.format(hashlib.md5(path_str.encode('utf-8')).hexdigest(),
piece.path, piece_index)
all_cols = self._local_cache.get(cache_key,
lambda: self._load_rows(parquet_file, piece, shuffle_row_drop_partition))
if all_cols:
self.publish_func(all_cols)