in src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala [223:318]
def listFilesInternal(partitionFilters: Seq[Expression], dataFilters: Seq[Expression],
derivedFilters: Seq[Expression]): Seq[PartitionDirectory] = {
if (cached.containsKey((partitionFilters, dataFilters, derivedFilters))) {
return cached.get((partitionFilters, dataFilters, derivedFilters))._1
}
require(isResolved)
val timePartitionFilters = getSpecFilter(dataFilters, timePartitionColumn)
val dimFilters = getDimFilter(dataFilters, timePartitionColumn, shardByColumn)
val derivedDimFilters = getDimFilter(derivedFilters, timePartitionColumn, shardByColumn)
logInfoIf(timePartitionFilters.nonEmpty)(s"Applying time partition filters: ${timePartitionFilters.mkString(",")}")
// segment pruning
val project = dataflow.getProject
val projectKylinConfig = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv).getProject(project).getConfig
var selected = prunedSegmentDirs
if (projectKylinConfig.isSkipEmptySegments) {
selected = afterPruning("pruning empty segment", null, selected) {
(_, segDirs) => pruneEmptySegments(segDirs)
}
}
selected = afterPruning("pruning segment with time partition", timePartitionFilters, selected) {
pruneSegments
}
val filteredSizeAfterTimePartition = selected.size
var filteredSizeAfterDimensionFilter = selected.size
if (projectKylinConfig.isDimensionRangeFilterEnabled) {
selected = afterPruning("pruning segment with dimension range", dimFilters, selected) {
pruneSegmentsDimRange
}
filteredSizeAfterDimensionFilter = selected.size
selected = afterPruning("pruning segment with derived dimension range", derivedDimFilters, selected) {
pruneSegmentsDimRange
}
}
QueryContext.current().record("seg_pruning")
QueryContext.current().getMetrics.setSegCount(selected.size)
logInfo(s"Segment Num: Before filter: ${prunedSegmentDirs.size}, After time partition filter: " +
s"$filteredSizeAfterTimePartition, After dimension filter: ${filteredSizeAfterDimensionFilter}, " +
s"After derived dimension filter: ${selected.size}.")
selected = selected.filter( // tolerate for ready & empty segment
seg => dataflow.getSegment(seg.segmentID).getLayoutsMap.containsKey(layout.getId))
selected = selected.par.map { e =>
val logString = s"[fetch file status for Segment ID: ${e.segmentID}; Partition Num: ${e.partitions.size}]"
logTime(logString, true) {
var statuses = Seq.empty[FileStatus]
e.partitions.foreach(id => {
val bucketId = dataflow.getSegment(e.segmentID).getBucketId(layout.getId, id)
val childDir = if (bucketId == null) id else bucketId
val path = new Path(toPath(e.segmentID) + s"/${childDir}")
statuses = statuses ++ getFileStatues(e.segmentID, path)
})
if (statuses.isEmpty) {
statuses = statuses ++ getFileStatues(e.segmentID, new Path(toPath(e.segmentID)))
}
SegmentDirectory(e.segmentID, e.partitions, statuses)
}
}.toIterator.toSeq
QueryContext.current().record("fetch_file_status")
// shards pruning
selected = afterPruning("pruning shard", dataFilters, selected) {
pruneShards
}
QueryContext.current().record("shard_pruning")
val totalFileCount = selected.flatMap(partition => partition.files).size
QueryContext.current().getMetrics.setFileCount(totalFileCount)
val totalFileSize = selected.flatMap(partition => partition.files).map(_.getLen).sum
val sourceRows = selected.map(seg => {
val segment = dataflow.getSegment(seg.segmentID)
val dataLayout = segment.getLayout(layout.getId)
val layoutRows = if (dataLayout == null) 0 else dataLayout.getRows
logInfo(s"Source scan rows: Query Id: ${QueryContext.current().getQueryId}, Segment Id: ${seg.segmentID}, " +
s"Layout Id: ${layout.getId}, rows: $layoutRows.")
layoutRows
}).sum
setShufflePartitions(totalFileSize, sourceRows, session)
setFilesMaxPartitionBytes(totalFileSize, sourceRows, session)
if (selected.isEmpty) {
val value = Seq.empty[PartitionDirectory]
cached.put((partitionFilters, dataFilters, derivedFilters), (value, sourceRows))
value
} else {
val value = Seq(PartitionDirectory(InternalRow.empty, selected.flatMap(_.files)))
cached.put((partitionFilters, dataFilters, derivedFilters), (value, sourceRows))
value
}
}