in petastorm/etl/dataset_metadata.py [0:0]
def _split_row_groups(dataset):
if not dataset.metadata or dataset.metadata.num_row_groups == 0:
raise NotImplementedError("split_row_groups is only implemented "
"if dataset has parquet summary files "
"with row group information")
# We make a dictionary of how many row groups are in each file in
# order to split them. The Parquet Metadata file stores paths as the
# relative path from the dataset base dir.
row_groups_per_file = dict()
for i in range(dataset.metadata.num_row_groups):
row_group = dataset.metadata.row_group(i)
path = row_group.column(0).file_path
row_groups_per_file[path] = row_groups_per_file.get(path, 0) + 1
base_path = os.path.normpath(os.path.dirname(dataset.metadata_path))
split_pieces = []
for piece in dataset.pieces:
# Since the pieces are absolute path, we get the
# relative path to the dataset base dir to fetch the
# number of row groups in the file
relative_path = os.path.relpath(piece.path, base_path)
# If the path is not in the metadata file, that means there are
# no row groups in that file and that file should be skipped
if relative_path not in row_groups_per_file:
continue
for row_group in range(row_groups_per_file[relative_path]):
split_piece = compat_make_parquet_piece(piece.path, dataset.fs.open, row_group=row_group,
partition_keys=piece.partition_keys)
split_pieces.append(split_piece)
return split_pieces