in petastorm/etl/petastorm_generate_metadata.py [0:0]
def generate_petastorm_metadata(spark, dataset_url, unischema_class=None, use_summary_metadata=False,
hdfs_driver='libhdfs3'):
"""
Generates metadata necessary to read a petastorm dataset to an existing dataset.
:param spark: spark session
:param dataset_url: url of existing dataset
:param unischema_class: (optional) fully qualified dataset unischema class. If not specified will attempt
to find one already in the dataset. (e.g.
:class:`examples.hello_world.generate_hello_world_dataset.HelloWorldSchema`)
:param hdfs_driver: A string denoting the hdfs driver to use (if using a dataset on hdfs). Current choices are
libhdfs (java through JNI) or libhdfs3 (C++)
:param user: String denoting username when connecting to HDFS
"""
sc = spark.sparkContext
resolver = FilesystemResolver(dataset_url, sc._jsc.hadoopConfiguration(), hdfs_driver=hdfs_driver,
user=spark.sparkContext.sparkUser())
fs = resolver.filesystem()
dataset = pq.ParquetDataset(
resolver.get_dataset_path(),
filesystem=fs,
validate_schema=False)
if unischema_class:
schema = locate(unischema_class)
if not isinstance(schema, Unischema):
raise ValueError('The specified class %s is not an instance of a petastorm.Unischema object.',
unischema_class)
else:
try:
schema = get_schema(dataset)
except ValueError:
raise ValueError('Unischema class could not be located in existing dataset,'
' please specify it')
# In order to be backwards compatible, we retrieve the common metadata from the dataset before
# overwriting the metadata to keep row group indexes and the old row group per file index
arrow_metadata = dataset.common_metadata or None
with materialize_dataset(spark, dataset_url, schema, use_summary_metadata=use_summary_metadata,
filesystem_factory=resolver.filesystem_factory()):
if use_summary_metadata:
# Inside the materialize dataset context we just need to write the metadata file as the schema will
# be written by the context manager.
# We use the java ParquetOutputCommitter to write the metadata file for the existing dataset
# which will read all the footers of the dataset in parallel and merge them.
hadoop_config = sc._jsc.hadoopConfiguration()
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
parquet_output_committer = sc._gateway.jvm.org.apache.parquet.hadoop.ParquetOutputCommitter
parquet_output_committer.writeMetaDataFile(hadoop_config, Path(dataset_url))
spark.stop()
if use_summary_metadata and arrow_metadata:
# When calling writeMetaDataFile it will overwrite the _common_metadata file which could have schema information
# or row group indexers. Therefore we want to retain this information and will add it to the new
# _common_metadata file. If we were using the old legacy metadata method this file wont be deleted
base_schema = arrow_metadata.schema.to_arrow_schema()
metadata_dict = base_schema.metadata
if ROW_GROUPS_PER_FILE_KEY in metadata_dict:
add_to_dataset_metadata(dataset, ROW_GROUPS_PER_FILE_KEY, metadata_dict[ROW_GROUPS_PER_FILE_KEY])
if ROWGROUPS_INDEX_KEY in metadata_dict:
add_to_dataset_metadata(dataset, ROWGROUPS_INDEX_KEY, metadata_dict[ROWGROUPS_INDEX_KEY])