in spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetFileFormat.scala [171:346]
override def buildReaderWithPartitionValues(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
hadoopConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, requiredSchema.json)
hadoopConf.set(ParquetWriteSupport.SPARK_ROW_SCHEMA, requiredSchema.json)
hadoopConf.set(
SQLConf.SESSION_LOCAL_TIMEZONE.key,
sparkSession.sessionState.conf.sessionLocalTimeZone)
hadoopConf.setBoolean(
SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
hadoopConf.setBoolean(
SQLConf.CASE_SENSITIVE.key,
sparkSession.sessionState.conf.caseSensitiveAnalysis)
// Sets flags for `ParquetToSparkSchemaConverter`
hadoopConf.setBoolean(
SQLConf.PARQUET_BINARY_AS_STRING.key,
sparkSession.sessionState.conf.isParquetBinaryAsString)
hadoopConf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
hadoopConf.setBoolean(
SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key,
sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled)
hadoopConf.setBoolean(
SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
sparkSession.sessionState.conf.legacyParquetNanosAsLong)
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
// TODO: if you move this into the closure it reverts to the default values.
// If true, enable using the custom RecordReader for parquet. This only works for
// a subset of the types (no complex types).
val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
val sqlConf = sparkSession.sessionState.conf
val enableVectorizedReader: Boolean =
sqlConf.parquetVectorizedReaderEnabled &&
resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
val pushDownDate = sqlConf.parquetFilterPushDownDate
val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
val parquetFilterPushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)
val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead
(file: PartitionedFile) => {
assert(file.partitionValues.numFields == partitionSchema.size)
val filePath = file.toPath
val split = new FileSplit(filePath, file.start, file.length, Array.empty[String])
val sharedConf = broadcastedHadoopConf.value.value
val footerFileMetaData =
ParquetFooterReader
.readFooter(sharedConf, file, ParquetFooterReader.SKIP_ROW_GROUPS)
.getFileMetaData
val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(
footerFileMetaData.getKeyValueMetaData.get,
datetimeRebaseModeInRead)
val int96RebaseSpec = DataSourceUtils.int96RebaseSpec(
footerFileMetaData.getKeyValueMetaData.get,
int96RebaseModeInRead)
// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = new ParquetFilters(
parquetSchema,
pushDownDate,
pushDownTimestamp,
pushDownDecimal,
parquetFilterPushDownStringPredicate,
pushDownInFilterThreshold,
isCaseSensitive,
datetimeRebaseSpec)
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(parquetFilters.createFilter(_))
.reduceOption(FilterApi.and)
} else {
None
}
// Prune file scans using pushed down spatial filters and per-column bboxes in geoparquet metadata
val shouldScanFile =
GeoParquetMetaData.parseKeyValueMetaData(footerFileMetaData.getKeyValueMetaData).forall {
metadata => spatialFilter.forall(_.evaluate(metadata.columns))
}
if (!shouldScanFile) {
// The entire file is pruned so that we don't need to scan this file.
Seq.empty[InternalRow].iterator
} else {
// PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps'
// *only* if the file was created by something other than "parquet-mr", so check the actual
// writer here for this file. We have to do this per-file, as each file in the table may
// have different writers.
// Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads.
def isCreatedByParquetMr: Boolean =
footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
val convertTz =
if (timestampConversion && !isCreatedByParquetMr) {
Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
} else {
None
}
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext =
new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)
// Try to push down filters when filter push-down is enabled.
// Notice: This push-down is RowGroups level, not individual records.
if (pushed.isDefined) {
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
}
if (enableVectorizedReader) {
logWarning(
s"GeoParquet currently does not support vectorized reader. Falling back to parquet-mr")
}
logDebug(s"Falling back to parquet-mr")
// ParquetRecordReader returns InternalRow
val readSupport = new GeoParquetReadSupport(
convertTz,
enableVectorizedReader = false,
datetimeRebaseSpec,
int96RebaseSpec,
options)
val reader = if (pushed.isDefined && enableRecordFilter) {
val parquetFilter = FilterCompat.get(pushed.get, null)
new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
} else {
new ParquetRecordReader[InternalRow](readSupport)
}
val readerWithRowIndexes =
ParquetRowIndexUtil.addRowIndexToRecordReaderIfNeeded(reader, requiredSchema)
val iter = new RecordReaderIterator[InternalRow](readerWithRowIndexes)
try {
readerWithRowIndexes.initialize(split, hadoopAttemptContext)
val fullSchema = toAttributes(requiredSchema) ++ toAttributes(partitionSchema)
val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
if (partitionSchema.length == 0) {
// There is no partition columns
iter.map(unsafeProjection)
} else {
val joinedRow = new JoinedRow()
iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
}
} catch {
case e: Throwable =>
// SPARK-23457: In case there is an exception in initialization, close the iterator to
// avoid leaking resources.
iter.close()
throw e
}
}
}
}