in petastorm/etl/dataset_metadata.py [0:0]
def load_row_groups(dataset):
"""
Load dataset row group pieces from metadata
:param dataset: parquet dataset object.
:param allow_read_footers: whether to allow reading parquet footers if there is no better way
to load row group information
:return: splitted pieces, one piece per row group
"""
# We try to get row group information from metadata file
metadata = dataset.metadata
common_metadata = dataset.common_metadata
if not metadata and not common_metadata:
# If we are inferring the schema we allow reading the footers to get the row group information
return _split_row_groups_from_footers(dataset)
if metadata and metadata.num_row_groups > 0:
# If the metadata file exists and has row group information we use it to split the dataset pieces
return _split_row_groups(dataset)
# If we don't have row groups in the common metadata we look for the old way of loading it
dataset_metadata_dict = common_metadata.metadata
if ROW_GROUPS_PER_FILE_KEY not in dataset_metadata_dict:
raise PetastormMetadataError(
'Could not find row group metadata in _common_metadata file.'
' Use materialize_dataset(..) in petastorm.etl.dataset_metadata.py to generate'
' this file in your ETL code.'
' You can generate it on an existing dataset using petastorm-generate-metadata.py')
metadata_dict_key = ROW_GROUPS_PER_FILE_KEY
row_groups_per_file = json.loads(dataset_metadata_dict[metadata_dict_key].decode())
rowgroups = []
# Force order of pieces. The order is not deterministic since it depends on multithreaded directory
# listing implementation inside pyarrow. We stabilize order here, this way we get reproducable order
# when pieces shuffling is off. This also enables implementing piece shuffling given a seed
sorted_pieces = sorted(dataset.pieces, key=attrgetter('path'))
for piece in sorted_pieces:
# If we are not using absolute paths, we need to convert the path to a relative path for
# looking up the number of row groups.
row_groups_key = os.path.relpath(piece.path, dataset.paths)
# When reading parquet store directly from an s3 bucket, a separate piece is created for root directory.
# This is not a real "piece" and we won't have row_groups_per_file recorded for it.
if row_groups_key != ".":
for row_group in range(row_groups_per_file[row_groups_key]):
rowgroups.append(compat_make_parquet_piece(piece.path, dataset.fs.open, row_group=row_group,
partition_keys=piece.partition_keys))
return rowgroups