def listFilesInternal()

in src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala [223:318]


  def listFilesInternal(partitionFilters: Seq[Expression], dataFilters: Seq[Expression],
                        derivedFilters: Seq[Expression]): Seq[PartitionDirectory] = {
    if (cached.containsKey((partitionFilters, dataFilters, derivedFilters))) {
      return cached.get((partitionFilters, dataFilters, derivedFilters))._1
    }

    require(isResolved)
    val timePartitionFilters = getSpecFilter(dataFilters, timePartitionColumn)
    val dimFilters = getDimFilter(dataFilters, timePartitionColumn, shardByColumn)
    val derivedDimFilters = getDimFilter(derivedFilters, timePartitionColumn, shardByColumn)
    logInfoIf(timePartitionFilters.nonEmpty)(s"Applying time partition filters: ${timePartitionFilters.mkString(",")}")

    // segment pruning
    val project = dataflow.getProject
    val projectKylinConfig = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv).getProject(project).getConfig

    var selected = prunedSegmentDirs
    if (projectKylinConfig.isSkipEmptySegments) {
      selected = afterPruning("pruning empty segment", null, selected) {
        (_, segDirs) => pruneEmptySegments(segDirs)
      }
    }

    selected = afterPruning("pruning segment with time partition", timePartitionFilters, selected) {
      pruneSegments
    }
    val filteredSizeAfterTimePartition = selected.size
    var filteredSizeAfterDimensionFilter = selected.size

    if (projectKylinConfig.isDimensionRangeFilterEnabled) {
      selected = afterPruning("pruning segment with dimension range", dimFilters, selected) {
        pruneSegmentsDimRange
      }
      filteredSizeAfterDimensionFilter = selected.size
      selected = afterPruning("pruning segment with derived dimension range", derivedDimFilters, selected) {
        pruneSegmentsDimRange
      }
    }

    QueryContext.current().record("seg_pruning")
    QueryContext.current().getMetrics.setSegCount(selected.size)

    logInfo(s"Segment Num: Before filter: ${prunedSegmentDirs.size}, After time partition filter: " +
      s"$filteredSizeAfterTimePartition, After dimension filter: ${filteredSizeAfterDimensionFilter}, " +
      s"After derived dimension filter: ${selected.size}.")

    selected = selected.filter( // tolerate for ready & empty segment
      seg => dataflow.getSegment(seg.segmentID).getLayoutsMap.containsKey(layout.getId))

    selected = selected.par.map { e =>
      val logString = s"[fetch file status for Segment ID: ${e.segmentID}; Partition Num: ${e.partitions.size}]"
      logTime(logString, true) {
        var statuses = Seq.empty[FileStatus]
        e.partitions.foreach(id => {
          val bucketId = dataflow.getSegment(e.segmentID).getBucketId(layout.getId, id)
          val childDir = if (bucketId == null) id else bucketId
          val path = new Path(toPath(e.segmentID) + s"/${childDir}")
          statuses = statuses ++ getFileStatues(e.segmentID, path)
        })
        if (statuses.isEmpty) {
          statuses = statuses ++ getFileStatues(e.segmentID, new Path(toPath(e.segmentID)))
        }

        SegmentDirectory(e.segmentID, e.partitions, statuses)
      }
    }.toIterator.toSeq
    QueryContext.current().record("fetch_file_status")
    // shards pruning
    selected = afterPruning("pruning shard", dataFilters, selected) {
      pruneShards
    }
    QueryContext.current().record("shard_pruning")
    val totalFileCount = selected.flatMap(partition => partition.files).size
    QueryContext.current().getMetrics.setFileCount(totalFileCount)
    val totalFileSize = selected.flatMap(partition => partition.files).map(_.getLen).sum
    val sourceRows = selected.map(seg => {
      val segment = dataflow.getSegment(seg.segmentID)
      val dataLayout = segment.getLayout(layout.getId)
      val layoutRows = if (dataLayout == null) 0 else dataLayout.getRows
      logInfo(s"Source scan rows: Query Id: ${QueryContext.current().getQueryId}, Segment Id: ${seg.segmentID}, " +
        s"Layout Id: ${layout.getId}, rows: $layoutRows.")
      layoutRows
    }).sum
    setShufflePartitions(totalFileSize, sourceRows, session)
    setFilesMaxPartitionBytes(totalFileSize, sourceRows, session)
    if (selected.isEmpty) {
      val value = Seq.empty[PartitionDirectory]
      cached.put((partitionFilters, dataFilters, derivedFilters), (value, sourceRows))
      value
    } else {
      val value = Seq(PartitionDirectory(InternalRow.empty, selected.flatMap(_.files)))
      cached.put((partitionFilters, dataFilters, derivedFilters), (value, sourceRows))
      value
    }

  }