def build_rowgroup_index()

in petastorm/etl/rowgroup_indexing.py [0:0]


def build_rowgroup_index(dataset_url, spark_context, indexers, hdfs_driver='libhdfs3'):
    """
    Build index for given list of fields to use for fast rowgroup selection
    :param dataset_url: (str) the url for the dataset (or a path if you would like to use the default hdfs config)
    :param spark_context: (SparkContext)
    :param indexers: list of objects to build row groups indexes. Should support RowGroupIndexerBase interface
    :param hdfs_driver: A string denoting the hdfs driver to use (if using a dataset on hdfs). Current choices are
    libhdfs (java through JNI) or libhdfs3 (C++)
    :return: None, upon successful completion the rowgroup predicates will be saved to _metadata file
    """

    if dataset_url and dataset_url[-1] == '/':
        dataset_url = dataset_url[:-1]

    # Create pyarrow file system
    resolver = FilesystemResolver(dataset_url, spark_context._jsc.hadoopConfiguration(),
                                  hdfs_driver=hdfs_driver, user=spark_context.sparkUser())
    dataset = pq.ParquetDataset(resolver.get_dataset_path(), filesystem=resolver.filesystem(),
                                validate_schema=False)

    split_pieces = dataset_metadata.load_row_groups(dataset)
    schema = dataset_metadata.get_schema(dataset)

    # We need direct reference on partitions object
    partitions = dataset.partitions
    pieces_num = len(split_pieces)
    piece_info_list = []
    for piece_index in range(pieces_num):
        #  indexes relies on the ordering of the split dataset pieces.
        # This relies on how the dataset pieces are split and sorted which although should not change,
        # still might and we should make sure not to forget that could break this.
        piece = split_pieces[piece_index]
        piece_info_list.append(PieceInfo(piece_index, piece.path, piece.row_group, piece.partition_keys))

    start_time = time.time()
    piece_info_rdd = spark_context.parallelize(piece_info_list, min(len(piece_info_list), PARALLEL_SLICE_NUM))
    indexer_rdd = piece_info_rdd.map(lambda piece_info: _index_columns(piece_info, dataset_url, partitions,
                                                                       indexers, schema, hdfs_driver=hdfs_driver))
    indexer_list = indexer_rdd.reduce(_combine_indexers)

    indexer_dict = {indexer.index_name: indexer for indexer in indexer_list}
    serialized_indexers = pickle.dumps(indexer_dict, pickle.HIGHEST_PROTOCOL)
    utils.add_to_dataset_metadata(dataset, ROWGROUPS_INDEX_KEY, serialized_indexers)
    logger.info("Elapsed time of index creation: %f s", (time.time() - start_time))