def materialize_dataset()

in petastorm/etl/dataset_metadata.py [0:0]


def materialize_dataset(spark, dataset_url, schema, row_group_size_mb=None, use_summary_metadata=False,
                        filesystem_factory=None):
    """
    A Context Manager which handles all the initialization and finalization necessary
    to generate metadata for a petastorm dataset. This should be used around your
    spark logic to materialize a dataset (specifically the writing of parquet output).

    Note: Any rowgroup indexing should happen outside the materialize_dataset block

    Example:

    >>> spark = SparkSession.builder...
    >>> ds_url = 'hdfs:///path/to/my/dataset'
    >>> with materialize_dataset(spark, ds_url, MyUnischema, 64):
    >>>   spark.sparkContext.parallelize(range(0, 10)).
    >>>     ...
    >>>     .write.parquet(ds_url)
    >>> indexer = [SingleFieldIndexer(...)]
    >>> build_rowgroup_index(ds_url, spark.sparkContext, indexer)

    A user may provide their own recipe for creation of pyarrow filesystem object in ``filesystem_factory``
    argument (otherwise, petastorm will create a default one based on the url).

    The following example shows how a custom pyarrow HDFS filesystem, instantiated using ``libhdfs`` driver can be used
    during Petastorm dataset generation:

    >>> resolver=FilesystemResolver(dataset_url, spark.sparkContext._jsc.hadoopConfiguration(),
    >>>                             hdfs_driver='libhdfs')
    >>> with materialize_dataset(..., filesystem_factory=resolver.filesystem_factory()):
    >>>     ...


    :param spark: The spark session you are using
    :param dataset_url: The dataset url to output your dataset to (e.g. ``hdfs:///path/to/dataset``)
    :param schema: The :class:`petastorm.unischema.Unischema` definition of your dataset
    :param row_group_size_mb: The parquet row group size to use for your dataset
    :param use_summary_metadata: Whether to use the parquet summary metadata for row group indexing or a custom
      indexing method. The custom indexing method is more scalable for very large datasets.
    :param filesystem_factory: A filesystem factory function to be used when saving Petastorm specific metadata to the
      Parquet store.
    """
    spark_config = {}
    _init_spark(spark, spark_config, row_group_size_mb, use_summary_metadata)
    yield
    # After job completes, add the unischema metadata and check for the metadata summary file
    if filesystem_factory is None:
        resolver = FilesystemResolver(dataset_url, spark.sparkContext._jsc.hadoopConfiguration(),
                                      user=spark.sparkContext.sparkUser())
        filesystem_factory = resolver.filesystem_factory()
        dataset_path = resolver.get_dataset_path()
    else:
        dataset_path = urlparse(dataset_url).path
    filesystem = filesystem_factory()

    dataset = pq.ParquetDataset(
        dataset_path,
        filesystem=filesystem,
        validate_schema=False)

    _generate_unischema_metadata(dataset, schema)
    if not use_summary_metadata:
        _generate_num_row_groups_per_file(dataset, spark.sparkContext, filesystem_factory)

    # Reload the dataset to take into account the new metadata
    dataset = pq.ParquetDataset(
        dataset_path,
        filesystem=filesystem,
        validate_schema=False)
    try:
        # Try to load the row groups, if it fails that means the metadata was not generated properly
        load_row_groups(dataset)
    except PetastormMetadataError:
        raise PetastormMetadataGenerationError(
            'Could not find summary metadata file. The dataset will exist but you will need'
            ' to execute petastorm-generate-metadata.py before you can read your dataset '
            ' in order to generate the necessary metadata.'
            ' Try increasing spark driver memory next time and making sure you are'
            ' using parquet-mr >= 1.8.3')

    _cleanup_spark(spark, spark_config, row_group_size_mb)