def apply()

in src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinDeltaSourceStrategy.scala [150:285]


  def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    case ScanOperation(projects, filters,
    l@LogicalRelation(
    fsRelation@HadoopFsRelation(
    _: KylinDeltaLogFileIndex,
    _,
    _,
    _,
    _,
    _),
    _, table, _)) =>
      // Filters on this relation fall into four categories based on where we can use them to avoid
      // reading unneeded data:
      //  - partition keys only - used to prune directories to read
      //  - bucket keys only - optionally used to prune files to read
      //  - keys stored in the data only - optionally used to skip groups of data in files
      //  - filters that need to be evaluated again after the scan
      val filterSet = ExpressionSet(filters)

      val normalizedFilters = DataSourceStrategy.normalizeExprs(
        filters.filter(_.deterministic), l.output)

      val partitionColumns =
        l.resolve(
          fsRelation.partitionSchema, fsRelation.sparkSession.sessionState.analyzer.resolver)
      val partitionSet = AttributeSet(partitionColumns)

      // this partitionKeyFilters should be the same with the ones being executed in
      // PruneFileSourcePartitions
      val partitionKeyFilters = DataSourceStrategy.getPushedDownFilters(partitionColumns,
        normalizedFilters)

      // subquery expressions are filtered out because they can't be used to prune buckets or pushed
      // down as data filters, yet they would be executed
      val normalizedFiltersWithoutSubqueries =
        normalizedFilters.filterNot(SubqueryExpression.hasSubquery)

      val bucketSpec: Option[BucketSpec] = fsRelation.bucketSpec
      val bucketSet = if (shouldPruneBuckets(bucketSpec)) {
        genBucketSet(normalizedFiltersWithoutSubqueries, bucketSpec.get)
      } else {
        None
      }

      val dataColumns =
        l.resolve(fsRelation.dataSchema, fsRelation.sparkSession.sessionState.analyzer.resolver)

      // Partition keys are not available in the statistics of the files.
      // `dataColumns` might have partition columns, we need to filter them out.
      val dataColumnsWithoutPartitionCols = dataColumns.filterNot(partitionColumns.contains)
      val dataFilters = normalizedFiltersWithoutSubqueries.flatMap { f =>
        if (f.references.intersect(partitionSet).nonEmpty) {
          extractPredicatesWithinOutputSet(f, AttributeSet(dataColumnsWithoutPartitionCols))
        } else {
          Some(f)
        }
      }
      val supportNestedPredicatePushdown =
        DataSourceUtils.supportNestedPredicatePushdown(fsRelation)
      val pushedFilters = dataFilters
        .flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown))
      logInfo(s"Pushed Filters: ${pushedFilters.mkString(",")}")

      // Predicates with both partition keys and attributes need to be evaluated after the scan.
      val afterScanFilters = filterSet -- partitionKeyFilters.filter(_.references.nonEmpty)
      logInfo(s"Post-Scan Filters: ${afterScanFilters.mkString(",")}")

      val filterAttributes = AttributeSet(afterScanFilters)
      val requiredExpressions: Seq[NamedExpression] = filterAttributes.toSeq ++ projects
      val requiredAttributes = AttributeSet(requiredExpressions)

      val readDataColumns =
        dataColumns
          .filter(requiredAttributes.contains)
          .filterNot(partitionColumns.contains)
      val outputSchema = readDataColumns.toStructType
      logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}")

      val metadataStructOpt = l.output.collectFirst {
        case FileSourceMetadataAttribute(attr) => attr
      }

      val metadataColumns = metadataStructOpt.map { metadataStruct =>
        metadataStruct.dataType.asInstanceOf[StructType].fields.map { field =>
          FileSourceMetadataAttribute(field.name, field.dataType)
        }.toSeq
      }.getOrElse(Seq.empty)

      // outputAttributes should also include the metadata columns at the very end
      val outputAttributes = readDataColumns ++ partitionColumns ++ metadataColumns

      // inject local pruning //
      val filePruner = fsRelation.location.asInstanceOf[KylinDeltaLogFileIndex]
      filePruner.listFiles(partitionKeyFilters.iterator.toSeq, dataFilters.iterator.toSeq)
      val cacheExp = filePruner.DeltaExpressionCache.getIfPresent((partitionKeyFilters.iterator.toSeq,
        dataFilters.iterator.toSeq))
      val sourceScanRows = if (cacheExp != null) {
        cacheExp._2
      } else 0L

      QueryContext.current().getMetrics.addAccumSourceScanRows(sourceScanRows)
      //      inject end       //

      val scan =
        KylinStorageScanExec(
          fsRelation,
          outputAttributes,
          outputSchema,
          partitionKeyFilters.toSeq,
          bucketSet,
          None,
          dataFilters,
          table.map(_.identifier))

      // extra Project node: wrap flat metadata columns to a metadata struct
      val withMetadataProjections = metadataStructOpt.map { metadataStruct =>
        val metadataAlias =
          Alias(CreateStruct(metadataColumns), METADATA_NAME)(exprId = metadataStruct.exprId)
        execution.ProjectExec(
          scan.output.dropRight(metadataColumns.length) :+ metadataAlias, scan)
      }.getOrElse(scan)

      val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And)
      val withFilter = afterScanFilter
        .map(execution.FilterExec(_, withMetadataProjections))
        .getOrElse(withMetadataProjections)
      val withProjections = if (projects == withFilter.output) {
        withFilter
      } else {
        execution.ProjectExec(projects, withFilter)
      }

      withProjections :: Nil

    case _ => Nil
  }