in petastorm/etl/dataset_metadata.py [0:0]
def _init_spark(spark, current_spark_config, row_group_size_mb=None, use_summary_metadata=False):
"""
Initializes spark and hdfs config with necessary options for petastorm datasets
before running the spark job.
"""
# It's important to keep pyspark import local because when placed at the top level it somehow messes up with
# namedtuple serialization code and we end up getting UnischemaFields objects depickled without overriden __eq__
# and __hash__ methods.
import pyspark
_PYSPARK_BEFORE_24 = version.parse(pyspark.__version__) < version.parse('2.4')
hadoop_config = spark.sparkContext._jsc.hadoopConfiguration()
# Store current values so we can restore them later
current_spark_config['parquet.summary.metadata.level'] = \
hadoop_config.get('parquet.summary.metadata.level')
current_spark_config['parquet.enable.summary-metadata'] = \
hadoop_config.get('parquet.enable.summary-metadata')
current_spark_config['parquet.summary.metadata.propagate-errors'] = \
hadoop_config.get('parquet.summary.metadata.propagate-errors')
current_spark_config['parquet.block.size.row.check.min'] = \
hadoop_config.get('parquet.block.size.row.check.min')
current_spark_config['parquet.row-group.size.row.check.min'] = \
hadoop_config.get('parquet.row-group.size.row.check.min')
current_spark_config['parquet.block.size'] = \
hadoop_config.get('parquet.block.size')
if _PYSPARK_BEFORE_24:
hadoop_config.setBoolean("parquet.enable.summary-metadata", use_summary_metadata)
else:
hadoop_config.set('parquet.summary.metadata.level', "ALL" if use_summary_metadata else "NONE")
# Our atg fork includes https://github.com/apache/parquet-mr/pull/502 which creates this
# option. This forces a job to fail if the summary metadata files cannot be created
# instead of just having them fail to be created silently
hadoop_config.setBoolean('parquet.summary.metadata.propagate-errors', True)
# In our atg fork this config is called parquet.block.size.row.check.min however in newer
# parquet versions it will be renamed to parquet.row-group.size.row.check.min
# We use both for backwards compatibility
hadoop_config.setInt('parquet.block.size.row.check.min', 3)
hadoop_config.setInt('parquet.row-group.size.row.check.min', 3)
if row_group_size_mb:
hadoop_config.setInt('parquet.block.size', row_group_size_mb * 1024 * 1024)