in petastorm/etl/dataset_metadata.py [0:0]
def materialize_dataset(spark, dataset_url, schema, row_group_size_mb=None, use_summary_metadata=False,
filesystem_factory=None):
"""
A Context Manager which handles all the initialization and finalization necessary
to generate metadata for a petastorm dataset. This should be used around your
spark logic to materialize a dataset (specifically the writing of parquet output).
Note: Any rowgroup indexing should happen outside the materialize_dataset block
Example:
>>> spark = SparkSession.builder...
>>> ds_url = 'hdfs:///path/to/my/dataset'
>>> with materialize_dataset(spark, ds_url, MyUnischema, 64):
>>> spark.sparkContext.parallelize(range(0, 10)).
>>> ...
>>> .write.parquet(ds_url)
>>> indexer = [SingleFieldIndexer(...)]
>>> build_rowgroup_index(ds_url, spark.sparkContext, indexer)
A user may provide their own recipe for creation of pyarrow filesystem object in ``filesystem_factory``
argument (otherwise, petastorm will create a default one based on the url).
The following example shows how a custom pyarrow HDFS filesystem, instantiated using ``libhdfs`` driver can be used
during Petastorm dataset generation:
>>> resolver=FilesystemResolver(dataset_url, spark.sparkContext._jsc.hadoopConfiguration(),
>>> hdfs_driver='libhdfs')
>>> with materialize_dataset(..., filesystem_factory=resolver.filesystem_factory()):
>>> ...
:param spark: The spark session you are using
:param dataset_url: The dataset url to output your dataset to (e.g. ``hdfs:///path/to/dataset``)
:param schema: The :class:`petastorm.unischema.Unischema` definition of your dataset
:param row_group_size_mb: The parquet row group size to use for your dataset
:param use_summary_metadata: Whether to use the parquet summary metadata for row group indexing or a custom
indexing method. The custom indexing method is more scalable for very large datasets.
:param filesystem_factory: A filesystem factory function to be used when saving Petastorm specific metadata to the
Parquet store.
"""
spark_config = {}
_init_spark(spark, spark_config, row_group_size_mb, use_summary_metadata)
yield
# After job completes, add the unischema metadata and check for the metadata summary file
if filesystem_factory is None:
resolver = FilesystemResolver(dataset_url, spark.sparkContext._jsc.hadoopConfiguration(),
user=spark.sparkContext.sparkUser())
filesystem_factory = resolver.filesystem_factory()
dataset_path = resolver.get_dataset_path()
else:
dataset_path = urlparse(dataset_url).path
filesystem = filesystem_factory()
dataset = pq.ParquetDataset(
dataset_path,
filesystem=filesystem,
validate_schema=False)
_generate_unischema_metadata(dataset, schema)
if not use_summary_metadata:
_generate_num_row_groups_per_file(dataset, spark.sparkContext, filesystem_factory)
# Reload the dataset to take into account the new metadata
dataset = pq.ParquetDataset(
dataset_path,
filesystem=filesystem,
validate_schema=False)
try:
# Try to load the row groups, if it fails that means the metadata was not generated properly
load_row_groups(dataset)
except PetastormMetadataError:
raise PetastormMetadataGenerationError(
'Could not find summary metadata file. The dataset will exist but you will need'
' to execute petastorm-generate-metadata.py before you can read your dataset '
' in order to generate the necessary metadata.'
' Try increasing spark driver memory next time and making sure you are'
' using parquet-mr >= 1.8.3')
_cleanup_spark(spark, spark_config, row_group_size_mb)