in hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30LegacyHoodieParquetFileFormat.scala [66:336]
override def buildReaderWithPartitionValues(sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
hadoopConf.set(
ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
requiredSchema.json)
hadoopConf.set(
ParquetWriteSupport.SPARK_ROW_SCHEMA,
requiredSchema.json)
hadoopConf.set(
SQLConf.SESSION_LOCAL_TIMEZONE.key,
sparkSession.sessionState.conf.sessionLocalTimeZone)
hadoopConf.setBoolean(
SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
hadoopConf.setBoolean(
SQLConf.CASE_SENSITIVE.key,
sparkSession.sessionState.conf.caseSensitiveAnalysis)
ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
// Sets flags for `ParquetToSparkSchemaConverter`
hadoopConf.setBoolean(
SQLConf.PARQUET_BINARY_AS_STRING.key,
sparkSession.sessionState.conf.isParquetBinaryAsString)
hadoopConf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
val internalSchemaStr = hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
// For Spark DataSource v1, there's no Physical Plan projection/schema pruning w/in Spark itself,
// therefore it's safe to do schema projection here
if (!isNullOrEmpty(internalSchemaStr)) {
val prunedInternalSchemaStr =
pruneInternalSchema(internalSchemaStr, requiredSchema)
hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, prunedInternalSchemaStr)
}
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
// TODO: if you move this into the closure it reverts to the default values.
// If true, enable using the custom RecordReader for parquet. This only works for
// a subset of the types (no complex types).
val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
val sqlConf = sparkSession.sessionState.conf
val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
val enableVectorizedReader: Boolean =
sqlConf.parquetVectorizedReaderEnabled &&
resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
val capacity = sqlConf.parquetVectorizedReaderBatchSize
val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
// Whole stage codegen (PhysicalRDD) is able to deal with batches directly
val returningBatch = supportBatch(sparkSession, resultSchema)
val pushDownDate = sqlConf.parquetFilterPushDownDate
val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
val timeZoneId = Option(sqlConf.sessionLocalTimeZone)
(file: PartitionedFile) => {
assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size)
val filePath = new Path(new URI(file.filePath))
val split =
new org.apache.parquet.hadoop.ParquetInputSplit(
filePath,
file.start,
file.start + file.length,
file.length,
Array.empty,
null)
val sharedConf = broadcastedHadoopConf.value.value
// Fetch internal schema
val internalSchemaStr = sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
// Internal schema has to be pruned at this point
val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
var shouldUseInternalSchema = !isNullOrEmpty(internalSchemaStr) && querySchemaOption.isPresent
val tablePath = sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
val fileSchema = if (shouldUseInternalSchema) {
val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, sharedConf, if (validCommits == null) "" else validCommits)
} else {
null
}
lazy val footerFileMetaData =
ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = if (HoodieSparkUtils.gteqSpark3_1_3) {
createParquetFilters(
parquetSchema,
pushDownDate,
pushDownTimestamp,
pushDownDecimal,
pushDownStringStartWith,
pushDownInFilterThreshold,
isCaseSensitive,
datetimeRebaseMode)
} else {
createParquetFilters(
parquetSchema,
pushDownDate,
pushDownTimestamp,
pushDownDecimal,
pushDownStringStartWith,
pushDownInFilterThreshold,
isCaseSensitive)
}
filters.map(rebuildFilterFromParquet(_, fileSchema, querySchemaOption.orElse(null)))
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(parquetFilters.createFilter)
.reduceOption(FilterApi.and)
} else {
None
}
// PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps'
// *only* if the file was created by something other than "parquet-mr", so check the actual
// writer here for this file. We have to do this per-file, as each file in the table may
// have different writers.
// Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads.
def isCreatedByParquetMr: Boolean =
footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
val convertTz =
if (timestampConversion && !isCreatedByParquetMr) {
Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
} else {
None
}
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
// Clone new conf
val hadoopAttemptConf = new Configuration(broadcastedHadoopConf.value.value)
val typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = if (shouldUseInternalSchema) {
val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema()
val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
hadoopAttemptConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, mergedSchema.json)
SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema)
} else {
val (implicitTypeChangeInfo, sparkRequestSchema) = HoodieParquetFileFormatHelper.buildImplicitSchemaChangeInfo(hadoopAttemptConf, footerFileMetaData, requiredSchema)
if (!implicitTypeChangeInfo.isEmpty) {
shouldUseInternalSchema = true
hadoopAttemptConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, sparkRequestSchema.json)
}
implicitTypeChangeInfo
}
val hadoopAttemptContext =
new TaskAttemptContextImpl(hadoopAttemptConf, attemptId)
// Try to push down filters when filter push-down is enabled.
// Notice: This push-down is RowGroups level, not individual records.
if (pushed.isDefined) {
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
}
val taskContext = Option(TaskContext.get())
if (enableVectorizedReader) {
val vectorizedReader =
if (shouldUseInternalSchema) {
new Spark30HoodieVectorizedParquetRecordReader(
convertTz.orNull,
datetimeRebaseMode.toString,
enableOffHeapColumnVector && taskContext.isDefined,
capacity,
typeChangeInfos)
} else {
new VectorizedParquetRecordReader(
convertTz.orNull,
datetimeRebaseMode.toString,
enableOffHeapColumnVector && taskContext.isDefined,
capacity)
}
val iter = new RecordReaderIterator(vectorizedReader)
// SPARK-23457 Register a task completion listener before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
vectorizedReader.initialize(split, hadoopAttemptContext)
// NOTE: We're making appending of the partitioned values to the rows read from the
// data file configurable
if (shouldAppendPartitionValues) {
logDebug(s"Appending $partitionSchema ${file.partitionValues}")
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
} else {
vectorizedReader.initBatch(StructType(Nil), InternalRow.empty)
}
if (returningBatch) {
vectorizedReader.enableReturningBatches()
}
// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
iter.asInstanceOf[Iterator[InternalRow]]
} else {
logDebug(s"Falling back to parquet-mr")
// ParquetRecordReader returns InternalRow
val readSupport = new ParquetReadSupport(
convertTz,
enableVectorizedReader = false,
datetimeRebaseMode)
val reader = if (pushed.isDefined && enableRecordFilter) {
val parquetFilter = FilterCompat.get(pushed.get, null)
new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
} else {
new ParquetRecordReader[InternalRow](readSupport)
}
val iter = new RecordReaderIterator[InternalRow](reader)
// SPARK-23457 Register a task completion listener before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
reader.initialize(split, hadoopAttemptContext)
val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
val unsafeProjection = if (typeChangeInfos.isEmpty) {
GenerateUnsafeProjection.generate(fullSchema, fullSchema)
} else {
// find type changed.
val newFullSchema = new StructType(requiredSchema.fields.zipWithIndex.map { case (f, i) =>
if (typeChangeInfos.containsKey(i)) {
StructField(f.name, typeChangeInfos.get(i).getRight, f.nullable, f.metadata)
} else f
}).toAttributes ++ partitionSchema.toAttributes
val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) =>
if (typeChangeInfos.containsKey(i)) {
val srcType = typeChangeInfos.get(i).getRight
val dstType = typeChangeInfos.get(i).getLeft
val needTimeZone = Cast.needsTimeZone(srcType, dstType)
Cast(attr, dstType, if (needTimeZone) timeZoneId else None)
} else attr
}
GenerateUnsafeProjection.generate(castSchema, newFullSchema)
}
// NOTE: We're making appending of the partitioned values to the rows read from the
// data file configurable
if (!shouldAppendPartitionValues || partitionSchema.length == 0) {
// There is no partition columns
iter.map(unsafeProjection)
} else {
val joinedRow = new JoinedRow()
iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
}
}
}
}