in petastorm/etl/rowgroup_indexing.py [0:0]
def build_rowgroup_index(dataset_url, spark_context, indexers, hdfs_driver='libhdfs3'):
"""
Build index for given list of fields to use for fast rowgroup selection
:param dataset_url: (str) the url for the dataset (or a path if you would like to use the default hdfs config)
:param spark_context: (SparkContext)
:param indexers: list of objects to build row groups indexes. Should support RowGroupIndexerBase interface
:param hdfs_driver: A string denoting the hdfs driver to use (if using a dataset on hdfs). Current choices are
libhdfs (java through JNI) or libhdfs3 (C++)
:return: None, upon successful completion the rowgroup predicates will be saved to _metadata file
"""
if dataset_url and dataset_url[-1] == '/':
dataset_url = dataset_url[:-1]
# Create pyarrow file system
resolver = FilesystemResolver(dataset_url, spark_context._jsc.hadoopConfiguration(),
hdfs_driver=hdfs_driver, user=spark_context.sparkUser())
dataset = pq.ParquetDataset(resolver.get_dataset_path(), filesystem=resolver.filesystem(),
validate_schema=False)
split_pieces = dataset_metadata.load_row_groups(dataset)
schema = dataset_metadata.get_schema(dataset)
# We need direct reference on partitions object
partitions = dataset.partitions
pieces_num = len(split_pieces)
piece_info_list = []
for piece_index in range(pieces_num):
# indexes relies on the ordering of the split dataset pieces.
# This relies on how the dataset pieces are split and sorted which although should not change,
# still might and we should make sure not to forget that could break this.
piece = split_pieces[piece_index]
piece_info_list.append(PieceInfo(piece_index, piece.path, piece.row_group, piece.partition_keys))
start_time = time.time()
piece_info_rdd = spark_context.parallelize(piece_info_list, min(len(piece_info_list), PARALLEL_SLICE_NUM))
indexer_rdd = piece_info_rdd.map(lambda piece_info: _index_columns(piece_info, dataset_url, partitions,
indexers, schema, hdfs_driver=hdfs_driver))
indexer_list = indexer_rdd.reduce(_combine_indexers)
indexer_dict = {indexer.index_name: indexer for indexer in indexer_list}
serialized_indexers = pickle.dumps(indexer_dict, pickle.HIGHEST_PROTOCOL)
utils.add_to_dataset_metadata(dataset, ROWGROUPS_INDEX_KEY, serialized_indexers)
logger.info("Elapsed time of index creation: %f s", (time.time() - start_time))