in spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala [73:216]
override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = true
override def buildReaderWithPartitionValues(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
val sqlConf = sparkSession.sessionState.conf
CometParquetFileFormat.populateConf(sqlConf, hadoopConf)
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
val useFieldId = CometParquetUtils.readFieldId(sqlConf)
val ignoreMissingIds = CometParquetUtils.ignoreMissingIds(sqlConf)
val pushDownDate = sqlConf.parquetFilterPushDownDate
val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
val optionsMap = CaseInsensitiveMap[String](options)
val parquetOptions = new ParquetOptions(optionsMap, sqlConf)
val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
val parquetFilterPushDown = sqlConf.parquetFilterPushDown
// Comet specific configurations
val capacity = CometConf.COMET_BATCH_SIZE.get(sqlConf)
val nativeIcebergCompat =
CometConf.COMET_NATIVE_SCAN_IMPL.get(sqlConf).equals(CometConf.SCAN_NATIVE_ICEBERG_COMPAT)
(file: PartitionedFile) => {
val sharedConf = broadcastedHadoopConf.value.value
val footer = FooterReader.readFooter(sharedConf, file)
val footerFileMetaData = footer.getFileMetaData
val datetimeRebaseSpec = CometParquetFileFormat.getDatetimeRebaseSpec(
file,
requiredSchema,
sharedConf,
footerFileMetaData,
datetimeRebaseModeInRead)
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = new ParquetFilters(
parquetSchema,
dataSchema,
pushDownDate,
pushDownTimestamp,
pushDownDecimal,
pushDownStringPredicate,
pushDownInFilterThreshold,
isCaseSensitive,
datetimeRebaseSpec)
val recordBatchReader =
if (nativeIcebergCompat) {
// We still need the predicate in the conf to allow us to generate row indexes based on
// the actual row groups read
val pushed = if (parquetFilterPushDown) {
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates
// can be converted (`ParquetFilters.createFilter` returns an `Option`). That's why
// a `flatMap` is used here.
.flatMap(parquetFilters.createFilter)
.reduceOption(FilterApi.and)
} else {
None
}
pushed.foreach(p => ParquetInputFormat.setFilterPredicate(sharedConf, p))
val pushedNative = if (parquetFilterPushDown) {
parquetFilters.createNativeFilters(filters)
} else {
None
}
val batchReader = new NativeBatchReader(
sharedConf,
file,
footer,
pushedNative.orNull,
capacity,
requiredSchema,
dataSchema,
isCaseSensitive,
useFieldId,
ignoreMissingIds,
datetimeRebaseSpec.mode == CORRECTED,
partitionSchema,
file.partitionValues,
JavaConverters.mapAsJavaMap(metrics))
try {
batchReader.init()
} catch {
case e: Throwable =>
batchReader.close()
throw e
}
batchReader
} else {
val pushed = if (parquetFilterPushDown) {
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates
// can be converted (`ParquetFilters.createFilter` returns an `Option`). That's why
// a `flatMap` is used here.
.flatMap(parquetFilters.createFilter)
.reduceOption(FilterApi.and)
} else {
None
}
pushed.foreach(p => ParquetInputFormat.setFilterPredicate(sharedConf, p))
val batchReader = new BatchReader(
sharedConf,
file,
footer,
capacity,
requiredSchema,
isCaseSensitive,
useFieldId,
ignoreMissingIds,
datetimeRebaseSpec.mode == CORRECTED,
partitionSchema,
file.partitionValues,
JavaConverters.mapAsJavaMap(metrics))
try {
batchReader.init()
} catch {
case e: Throwable =>
batchReader.close()
throw e
}
batchReader
}
val iter = new RecordReaderIterator(recordBatchReader)
try {
iter.asInstanceOf[Iterator[InternalRow]]
} catch {
case e: Throwable =>
iter.close()
throw e
}
}
}