def _split_row_groups()

in petastorm/etl/dataset_metadata.py [0:0]


def _split_row_groups(dataset):
    if not dataset.metadata or dataset.metadata.num_row_groups == 0:
        raise NotImplementedError("split_row_groups is only implemented "
                                  "if dataset has parquet summary files "
                                  "with row group information")

    # We make a dictionary of how many row groups are in each file in
    # order to split them. The Parquet Metadata file stores paths as the
    # relative path from the dataset base dir.
    row_groups_per_file = dict()
    for i in range(dataset.metadata.num_row_groups):
        row_group = dataset.metadata.row_group(i)
        path = row_group.column(0).file_path
        row_groups_per_file[path] = row_groups_per_file.get(path, 0) + 1

    base_path = os.path.normpath(os.path.dirname(dataset.metadata_path))
    split_pieces = []
    for piece in dataset.pieces:
        # Since the pieces are absolute path, we get the
        # relative path to the dataset base dir to fetch the
        # number of row groups in the file
        relative_path = os.path.relpath(piece.path, base_path)

        # If the path is not in the metadata file, that means there are
        # no row groups in that file and that file should be skipped
        if relative_path not in row_groups_per_file:
            continue

        for row_group in range(row_groups_per_file[relative_path]):
            split_piece = compat_make_parquet_piece(piece.path, dataset.fs.open, row_group=row_group,
                                                    partition_keys=piece.partition_keys)
            split_pieces.append(split_piece)

    return split_pieces