in src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinDeltaSourceStrategy.scala [150:285]
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case ScanOperation(projects, filters,
l@LogicalRelation(
fsRelation@HadoopFsRelation(
_: KylinDeltaLogFileIndex,
_,
_,
_,
_,
_),
_, table, _)) =>
// Filters on this relation fall into four categories based on where we can use them to avoid
// reading unneeded data:
// - partition keys only - used to prune directories to read
// - bucket keys only - optionally used to prune files to read
// - keys stored in the data only - optionally used to skip groups of data in files
// - filters that need to be evaluated again after the scan
val filterSet = ExpressionSet(filters)
val normalizedFilters = DataSourceStrategy.normalizeExprs(
filters.filter(_.deterministic), l.output)
val partitionColumns =
l.resolve(
fsRelation.partitionSchema, fsRelation.sparkSession.sessionState.analyzer.resolver)
val partitionSet = AttributeSet(partitionColumns)
// this partitionKeyFilters should be the same with the ones being executed in
// PruneFileSourcePartitions
val partitionKeyFilters = DataSourceStrategy.getPushedDownFilters(partitionColumns,
normalizedFilters)
// subquery expressions are filtered out because they can't be used to prune buckets or pushed
// down as data filters, yet they would be executed
val normalizedFiltersWithoutSubqueries =
normalizedFilters.filterNot(SubqueryExpression.hasSubquery)
val bucketSpec: Option[BucketSpec] = fsRelation.bucketSpec
val bucketSet = if (shouldPruneBuckets(bucketSpec)) {
genBucketSet(normalizedFiltersWithoutSubqueries, bucketSpec.get)
} else {
None
}
val dataColumns =
l.resolve(fsRelation.dataSchema, fsRelation.sparkSession.sessionState.analyzer.resolver)
// Partition keys are not available in the statistics of the files.
// `dataColumns` might have partition columns, we need to filter them out.
val dataColumnsWithoutPartitionCols = dataColumns.filterNot(partitionColumns.contains)
val dataFilters = normalizedFiltersWithoutSubqueries.flatMap { f =>
if (f.references.intersect(partitionSet).nonEmpty) {
extractPredicatesWithinOutputSet(f, AttributeSet(dataColumnsWithoutPartitionCols))
} else {
Some(f)
}
}
val supportNestedPredicatePushdown =
DataSourceUtils.supportNestedPredicatePushdown(fsRelation)
val pushedFilters = dataFilters
.flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown))
logInfo(s"Pushed Filters: ${pushedFilters.mkString(",")}")
// Predicates with both partition keys and attributes need to be evaluated after the scan.
val afterScanFilters = filterSet -- partitionKeyFilters.filter(_.references.nonEmpty)
logInfo(s"Post-Scan Filters: ${afterScanFilters.mkString(",")}")
val filterAttributes = AttributeSet(afterScanFilters)
val requiredExpressions: Seq[NamedExpression] = filterAttributes.toSeq ++ projects
val requiredAttributes = AttributeSet(requiredExpressions)
val readDataColumns =
dataColumns
.filter(requiredAttributes.contains)
.filterNot(partitionColumns.contains)
val outputSchema = readDataColumns.toStructType
logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}")
val metadataStructOpt = l.output.collectFirst {
case FileSourceMetadataAttribute(attr) => attr
}
val metadataColumns = metadataStructOpt.map { metadataStruct =>
metadataStruct.dataType.asInstanceOf[StructType].fields.map { field =>
FileSourceMetadataAttribute(field.name, field.dataType)
}.toSeq
}.getOrElse(Seq.empty)
// outputAttributes should also include the metadata columns at the very end
val outputAttributes = readDataColumns ++ partitionColumns ++ metadataColumns
// inject local pruning //
val filePruner = fsRelation.location.asInstanceOf[KylinDeltaLogFileIndex]
filePruner.listFiles(partitionKeyFilters.iterator.toSeq, dataFilters.iterator.toSeq)
val cacheExp = filePruner.DeltaExpressionCache.getIfPresent((partitionKeyFilters.iterator.toSeq,
dataFilters.iterator.toSeq))
val sourceScanRows = if (cacheExp != null) {
cacheExp._2
} else 0L
QueryContext.current().getMetrics.addAccumSourceScanRows(sourceScanRows)
// inject end //
val scan =
KylinStorageScanExec(
fsRelation,
outputAttributes,
outputSchema,
partitionKeyFilters.toSeq,
bucketSet,
None,
dataFilters,
table.map(_.identifier))
// extra Project node: wrap flat metadata columns to a metadata struct
val withMetadataProjections = metadataStructOpt.map { metadataStruct =>
val metadataAlias =
Alias(CreateStruct(metadataColumns), METADATA_NAME)(exprId = metadataStruct.exprId)
execution.ProjectExec(
scan.output.dropRight(metadataColumns.length) :+ metadataAlias, scan)
}.getOrElse(scan)
val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And)
val withFilter = afterScanFilter
.map(execution.FilterExec(_, withMetadataProjections))
.getOrElse(withMetadataProjections)
val withProjections = if (projects == withFilter.output) {
withFilter
} else {
execution.ProjectExec(projects, withFilter)
}
withProjections :: Nil
case _ => Nil
}