def apply()

in src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinSourceStrategy.scala [56:153]


  def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    case PhysicalOperation(projects, filters,
    l@LogicalRelation(
    fsRelation@HadoopFsRelation(
    _: FilePruner,
    _,
    _,
    _,
    _,
    _),
    _, table, _)) =>
      // Filters on this relation fall into four categories based on where we can use them to avoid
      // reading unneeded data:
      //  - partition keys only - used to prune directories to read
      //  - bucket keys only - optionally used to prune files to read
      //  - keys stored in the data only - optionally used to skip groups of data in files
      //  - filters that need to be evaluated again after the scan
      val filterSet = ExpressionSet(filters)

      // The attribute name of predicate could be different than the one in schema in case of
      // case insensitive, we should change them to match the one in schema, so we do not need to
      // worry about case sensitivity anymore.
      val normalizedFilters = filters.map { e =>
        e transform {
          case a: AttributeReference =>
            a.withName(l.output.find(_.semanticEquals(a)).get.name)
        }
      }

      val partitionColumns = l.resolve(
        fsRelation.partitionSchema, fsRelation.sparkSession.sessionState.analyzer.resolver)
      val partitionSet = AttributeSet(partitionColumns)
      val partitionKeyFilters =
        ExpressionSet(normalizedFilters
          .filterNot(SubqueryExpression.hasSubquery)
          .filter(_.references.subsetOf(partitionSet)))

      // inject segment pruning //
      val filePruner = fsRelation.location.asInstanceOf[FilePruner]
      filePruner.resolve(l, fsRelation.sparkSession.sessionState.analyzer.resolver)
      //      inject end       //

      logInfoIf(partitionKeyFilters.nonEmpty)(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}")

      val dataColumns =
        l.resolve(fsRelation.dataSchema, fsRelation.sparkSession.sessionState.analyzer.resolver)

      // Partition keys are not available in the statistics of the files.
      val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)

      // Predicates with both partition keys and attributes need to be evaluated after the scan.
      val afterScanFilters = filterSet -- partitionKeyFilters.filter(_.references.nonEmpty)
      logInfoIf(afterScanFilters.nonEmpty)(s"Post-Scan Filters: ${afterScanFilters.mkString(",")}")

      val filterAttributes = AttributeSet(afterScanFilters)
      val requiredExpressions: Seq[NamedExpression] = filterAttributes.toSeq ++ projects
      val requiredAttributes = AttributeSet(requiredExpressions)

      val readDataColumns =
        dataColumns
          .filter(requiredAttributes.contains)
          .filterNot(partitionColumns.contains)
      val outputSchema = readDataColumns.toStructType
      logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}")

      val outputAttributes = readDataColumns ++ partitionColumns
      logTime("listFiles", debug = true) {
        filePruner.listFiles(partitionKeyFilters.iterator.toSeq, dataFilters.iterator.toSeq)
      }
      val cacheV = filePruner.cached.get((partitionKeyFilters.iterator.toSeq,
        dataFilters.iterator.toSeq, Seq.empty[Expression]))
      val sourceScanRows = if (cacheV != null) {
        cacheV._2
      } else 0L
      QueryContext.current().getMetrics.addAccumSourceScanRows(sourceScanRows)
      val scan =
        new KylinFileSourceScanExec(
          fsRelation,
          outputAttributes,
          outputSchema,
          partitionKeyFilters.toSeq,
          filePruner.getShardSpec,
          None,
          dataFilters,
          table.map(_.identifier),
          false,
          sourceScanRows)

      val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And)
      val withFilter = afterScanFilter.map(execution.FilterExec(_, scan)).getOrElse(scan)
      val withProjections = if (projects == withFilter.output) {
        withFilter
      } else {
        execution.ProjectExec(projects, withFilter)
      }
      withProjections :: Nil
    case _ => Nil
  }