def generate_petastorm_metadata()

in petastorm/etl/petastorm_generate_metadata.py [0:0]


def generate_petastorm_metadata(spark, dataset_url, unischema_class=None, use_summary_metadata=False,
                                hdfs_driver='libhdfs3'):
    """
    Generates metadata necessary to read a petastorm dataset to an existing dataset.

    :param spark: spark session
    :param dataset_url: url of existing dataset
    :param unischema_class: (optional) fully qualified dataset unischema class. If not specified will attempt
        to find one already in the dataset. (e.g.
        :class:`examples.hello_world.generate_hello_world_dataset.HelloWorldSchema`)
    :param hdfs_driver: A string denoting the hdfs driver to use (if using a dataset on hdfs). Current choices are
        libhdfs (java through JNI) or libhdfs3 (C++)
    :param user: String denoting username when connecting to HDFS
    """
    sc = spark.sparkContext

    resolver = FilesystemResolver(dataset_url, sc._jsc.hadoopConfiguration(), hdfs_driver=hdfs_driver,
                                  user=spark.sparkContext.sparkUser())
    fs = resolver.filesystem()
    dataset = pq.ParquetDataset(
        resolver.get_dataset_path(),
        filesystem=fs,
        validate_schema=False)

    if unischema_class:
        schema = locate(unischema_class)
        if not isinstance(schema, Unischema):
            raise ValueError('The specified class %s is not an instance of a petastorm.Unischema object.',
                             unischema_class)
    else:

        try:
            schema = get_schema(dataset)
        except ValueError:
            raise ValueError('Unischema class could not be located in existing dataset,'
                             ' please specify it')

    # In order to be backwards compatible, we retrieve the common metadata from the dataset before
    # overwriting the metadata to keep row group indexes and the old row group per file index
    arrow_metadata = dataset.common_metadata or None

    with materialize_dataset(spark, dataset_url, schema, use_summary_metadata=use_summary_metadata,
                             filesystem_factory=resolver.filesystem_factory()):
        if use_summary_metadata:
            # Inside the materialize dataset context we just need to write the metadata file as the schema will
            # be written by the context manager.
            # We use the java ParquetOutputCommitter to write the metadata file for the existing dataset
            # which will read all the footers of the dataset in parallel and merge them.
            hadoop_config = sc._jsc.hadoopConfiguration()
            Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
            parquet_output_committer = sc._gateway.jvm.org.apache.parquet.hadoop.ParquetOutputCommitter
            parquet_output_committer.writeMetaDataFile(hadoop_config, Path(dataset_url))

    spark.stop()

    if use_summary_metadata and arrow_metadata:
        # When calling writeMetaDataFile it will overwrite the _common_metadata file which could have schema information
        # or row group indexers. Therefore we want to retain this information and will add it to the new
        # _common_metadata file. If we were using the old legacy metadata method this file wont be deleted
        base_schema = arrow_metadata.schema.to_arrow_schema()
        metadata_dict = base_schema.metadata
        if ROW_GROUPS_PER_FILE_KEY in metadata_dict:
            add_to_dataset_metadata(dataset, ROW_GROUPS_PER_FILE_KEY, metadata_dict[ROW_GROUPS_PER_FILE_KEY])
        if ROWGROUPS_INDEX_KEY in metadata_dict:
            add_to_dataset_metadata(dataset, ROWGROUPS_INDEX_KEY, metadata_dict[ROWGROUPS_INDEX_KEY])