def _init_spark()

in petastorm/etl/dataset_metadata.py [0:0]


def _init_spark(spark, current_spark_config, row_group_size_mb=None, use_summary_metadata=False):
    """
    Initializes spark and hdfs config with necessary options for petastorm datasets
    before running the spark job.
    """

    # It's important to keep pyspark import local because when placed at the top level it somehow messes up with
    # namedtuple serialization code and we end up getting UnischemaFields objects depickled without overriden __eq__
    # and __hash__ methods.
    import pyspark
    _PYSPARK_BEFORE_24 = version.parse(pyspark.__version__) < version.parse('2.4')

    hadoop_config = spark.sparkContext._jsc.hadoopConfiguration()

    # Store current values so we can restore them later
    current_spark_config['parquet.summary.metadata.level'] = \
        hadoop_config.get('parquet.summary.metadata.level')
    current_spark_config['parquet.enable.summary-metadata'] = \
        hadoop_config.get('parquet.enable.summary-metadata')
    current_spark_config['parquet.summary.metadata.propagate-errors'] = \
        hadoop_config.get('parquet.summary.metadata.propagate-errors')
    current_spark_config['parquet.block.size.row.check.min'] = \
        hadoop_config.get('parquet.block.size.row.check.min')
    current_spark_config['parquet.row-group.size.row.check.min'] = \
        hadoop_config.get('parquet.row-group.size.row.check.min')
    current_spark_config['parquet.block.size'] = \
        hadoop_config.get('parquet.block.size')

    if _PYSPARK_BEFORE_24:
        hadoop_config.setBoolean("parquet.enable.summary-metadata", use_summary_metadata)
    else:
        hadoop_config.set('parquet.summary.metadata.level', "ALL" if use_summary_metadata else "NONE")

    # Our atg fork includes https://github.com/apache/parquet-mr/pull/502 which creates this
    # option. This forces a job to fail if the summary metadata files cannot be created
    # instead of just having them fail to be created silently
    hadoop_config.setBoolean('parquet.summary.metadata.propagate-errors', True)
    # In our atg fork this config is called parquet.block.size.row.check.min however in newer
    # parquet versions it will be renamed to parquet.row-group.size.row.check.min
    # We use both for backwards compatibility
    hadoop_config.setInt('parquet.block.size.row.check.min', 3)
    hadoop_config.setInt('parquet.row-group.size.row.check.min', 3)
    if row_group_size_mb:
        hadoop_config.setInt('parquet.block.size', row_group_size_mb * 1024 * 1024)