in maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala [150:208]
private def createParquetReaderFactory(): PartitionReaderFactory = {
val readDataSchemaAsJson = readDataSchema.json
hadoopConf.set(
ParquetInputFormat.READ_SUPPORT_CLASS,
classOf[ParquetReadSupport].getName
)
hadoopConf.set(
ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
readDataSchemaAsJson
)
hadoopConf.set(ParquetWriteSupport.SPARK_ROW_SCHEMA, readDataSchemaAsJson)
hadoopConf.set(
SQLConf.SESSION_LOCAL_TIMEZONE.key,
sparkSession.sessionState.conf.sessionLocalTimeZone
)
hadoopConf.setBoolean(
SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
sparkSession.sessionState.conf.nestedSchemaPruningEnabled
)
hadoopConf.setBoolean(
SQLConf.CASE_SENSITIVE.key,
sparkSession.sessionState.conf.caseSensitiveAnalysis
)
ParquetWriteSupport.setSchema(readDataSchema, hadoopConf)
// Sets flags for `ParquetToSparkSchemaConverter`
hadoopConf.setBoolean(
SQLConf.PARQUET_BINARY_AS_STRING.key,
sparkSession.sessionState.conf.isParquetBinaryAsString
)
hadoopConf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sparkSession.sessionState.conf.isParquetINT96AsTimestamp
)
hadoopConf.setBoolean(
SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
sparkSession.sessionState.conf.legacyParquetNanosAsLong
)
hadoopConf.setBoolean(
SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key,
sparkSession.sessionState.conf.parquetFieldIdReadEnabled
)
val broadcastedConf = sparkSession.sparkContext.broadcast(
new SerializableConfiguration(hadoopConf)
)
val sqlConf = sparkSession.sessionState.conf
ParquetPartitionReaderFactory(
sqlConf = sqlConf,
broadcastedConf = broadcastedConf,
dataSchema = dataSchema,
readDataSchema = readDataSchema,
partitionSchema = readPartitionSchema,
filters = pushedFilters,
aggregation = None,
new ParquetOptions(options.asCaseSensitiveMap.asScala.toMap, sqlConf)
)
}