override def apply()

in sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala [111:577]


  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    case PhysicalOperation(project, filters, DataSourceV2ScanRelation(
      v2Relation, V1ScanWrapper(scan, pushed, pushedDownOperators), output, _, _)) =>
      val v1Relation = scan.toV1TableScan[BaseRelation with TableScan](session.sqlContext)
      if (v1Relation.schema != scan.readSchema()) {
        throw QueryExecutionErrors.fallbackV1RelationReportsInconsistentSchemaError(
          scan.readSchema(), v1Relation.schema)
      }
      val rdd = v1Relation.buildScan()
      val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output, rdd)

      val catalogName = v2Relation.catalog.map(_.name())
      val tableIdentifier = v2Relation.identifier.flatMap(_.asTableIdentifierOpt(catalogName))

      val dsScan = RowDataSourceScanExec(
        output,
        output.toStructType,
        Set.empty,
        pushed.toSet,
        pushedDownOperators,
        unsafeRowRDD,
        v1Relation,
        None,
        tableIdentifier)
      DataSourceV2Strategy.withProjectAndFilter(
        project, filters, dsScan, needsUnsafeConversion = false) :: Nil

    case PhysicalOperation(project, filters,
        DataSourceV2ScanRelation(_, scan: LocalScan, output, _, _)) =>
      val localScanExec = LocalTableScanExec(output, scan.rows().toImmutableArraySeq, None)
      DataSourceV2Strategy.withProjectAndFilter(
        project, filters, localScanExec, needsUnsafeConversion = false) :: Nil

    case PhysicalOperation(project, filters, relation: DataSourceV2ScanRelation) =>
      // projection and filters were already pushed down in the optimizer.
      // this uses PhysicalOperation to get the projection and ensure that if the batch scan does
      // not support columnar, a projection is added to convert the rows to UnsafeRow.
      val (runtimeFilters, postScanFilters) = filters.partition {
        case _: DynamicPruning => true
        case _ => false
      }
      val batchExec = BatchScanExec(relation.output, relation.scan, runtimeFilters,
        relation.ordering, relation.relation.table,
        StoragePartitionJoinParams(relation.keyGroupedPartitioning))
      DataSourceV2Strategy.withProjectAndFilter(
        project, postScanFilters, batchExec, !batchExec.supportsColumnar) :: Nil

    case PhysicalOperation(p, f, r: StreamingDataSourceV2ScanRelation)
      if r.startOffset.isDefined && r.endOffset.isDefined =>

      val microBatchStream = r.stream.asInstanceOf[MicroBatchStream]
      val scanExec = MicroBatchScanExec(
        r.output, r.scan, microBatchStream, r.startOffset.get, r.endOffset.get)

      // Add a Project here to make sure we produce unsafe rows.
      DataSourceV2Strategy.withProjectAndFilter(p, f, scanExec, !scanExec.supportsColumnar) :: Nil

    case PhysicalOperation(p, f, r: StreamingDataSourceV2ScanRelation)
      if r.startOffset.isDefined && r.endOffset.isEmpty =>

      val continuousStream = r.stream.asInstanceOf[ContinuousStream]
      val scanExec = ContinuousScanExec(r.output, r.scan, continuousStream, r.startOffset.get)
      // initialize partitions
      scanExec.inputPartitions

      // Add a Project here to make sure we produce unsafe rows.
      DataSourceV2Strategy.withProjectAndFilter(p, f, scanExec, !scanExec.supportsColumnar) :: Nil

    case WriteToDataSourceV2(relationOpt, writer, query, customMetrics) =>
      val invalidateCacheFunc: () => Unit = () => relationOpt match {
        case Some(r) => session.sharedState.cacheManager.uncacheQuery(session, r, cascade = true)
        case None => ()
      }
      WriteToDataSourceV2Exec(writer, invalidateCacheFunc, planLater(query), customMetrics) :: Nil

    case c @ CreateTable(ResolvedIdentifier(catalog, ident), columns, partitioning,
        tableSpec: TableSpec, ifNotExists) =>
      val tableCatalog = catalog.asTableCatalog
      ResolveDefaultColumns.validateCatalogForDefaultValue(columns, tableCatalog, ident)
      ResolveTableConstraints.validateCatalogForTableConstraint(
        tableSpec.constraints, tableCatalog, ident)
      val statementType = "CREATE TABLE"
      GeneratedColumn.validateGeneratedColumns(
        c.tableSchema, tableCatalog, ident, statementType)
      IdentityColumn.validateIdentityColumn(c.tableSchema, tableCatalog, ident)

      CreateTableExec(
        catalog.asTableCatalog,
        ident,
        columns.map(_.toV2Column(statementType)).toArray,
        partitioning,
        qualifyLocInTableSpec(tableSpec),
        ifNotExists) :: Nil

    case CreateTableAsSelect(ResolvedIdentifier(catalog, ident), parts, query, tableSpec: TableSpec,
        options, ifNotExists, true) =>
      catalog match {
        case staging: StagingTableCatalog =>
          AtomicCreateTableAsSelectExec(staging, ident, parts, query,
            qualifyLocInTableSpec(tableSpec), options, ifNotExists) :: Nil
        case _ =>
          CreateTableAsSelectExec(catalog.asTableCatalog, ident, parts, query,
            qualifyLocInTableSpec(tableSpec), options, ifNotExists) :: Nil
      }

    case RefreshTable(r: ResolvedTable) =>
      RefreshTableExec(r.catalog, r.identifier, recacheTable(r)) :: Nil

    case c @ ReplaceTable(
        ResolvedIdentifier(catalog, ident), columns, parts, tableSpec: TableSpec, orCreate) =>
      val tableCatalog = catalog.asTableCatalog
      ResolveDefaultColumns.validateCatalogForDefaultValue(columns, tableCatalog, ident)
      ResolveTableConstraints.validateCatalogForTableConstraint(
        tableSpec.constraints, tableCatalog, ident)
      val statementType = "REPLACE TABLE"
      GeneratedColumn.validateGeneratedColumns(
        c.tableSchema, tableCatalog, ident, statementType)
      IdentityColumn.validateIdentityColumn(c.tableSchema, tableCatalog, ident)

      val v2Columns = columns.map(_.toV2Column(statementType)).toArray
      catalog match {
        case staging: StagingTableCatalog =>
          AtomicReplaceTableExec(staging, ident, v2Columns, parts,
            qualifyLocInTableSpec(tableSpec), orCreate = orCreate, invalidateCache) :: Nil
        case _ =>
          ReplaceTableExec(tableCatalog, ident, v2Columns, parts,
            qualifyLocInTableSpec(tableSpec), orCreate = orCreate, invalidateCache) :: Nil
      }

    case ReplaceTableAsSelect(ResolvedIdentifier(catalog, ident),
        parts, query, tableSpec: TableSpec, options, orCreate, true) =>
      catalog match {
        case staging: StagingTableCatalog =>
          AtomicReplaceTableAsSelectExec(
            staging,
            ident,
            parts,
            query,
            qualifyLocInTableSpec(tableSpec),
            options,
            orCreate = orCreate,
            invalidateCache) :: Nil
        case _ =>
          ReplaceTableAsSelectExec(
            catalog.asTableCatalog,
            ident,
            parts,
            query,
            qualifyLocInTableSpec(tableSpec),
            options,
            orCreate = orCreate,
            invalidateCache) :: Nil
      }

    case AppendData(r @ DataSourceV2Relation(v1: SupportsWrite, _, _, _, _), _, _,
        _, Some(write), analyzedQuery) if v1.supports(TableCapability.V1_BATCH_WRITE) =>
      write match {
        case v1Write: V1Write =>
          assert(analyzedQuery.isDefined)
          AppendDataExecV1(v1, analyzedQuery.get, refreshCache(r), v1Write) :: Nil
        case v2Write =>
          throw QueryCompilationErrors.batchWriteCapabilityError(
            v1, v2Write.getClass.getName, classOf[V1Write].getName)
      }

    case AppendData(r: DataSourceV2Relation, query, _, _, Some(write), _) =>
      AppendDataExec(planLater(query), refreshCache(r), write) :: Nil

    case OverwriteByExpression(r @ DataSourceV2Relation(v1: SupportsWrite, _, _, _, _), _, _,
        _, _, Some(write), analyzedQuery) if v1.supports(TableCapability.V1_BATCH_WRITE) =>
      write match {
        case v1Write: V1Write =>
          assert(analyzedQuery.isDefined)
          OverwriteByExpressionExecV1(v1, analyzedQuery.get, refreshCache(r), v1Write) :: Nil
        case v2Write =>
          throw QueryCompilationErrors.batchWriteCapabilityError(
            v1, v2Write.getClass.getName, classOf[V1Write].getName)
      }

    case OverwriteByExpression(
        r: DataSourceV2Relation, _, query, _, _, Some(write), _) =>
      OverwriteByExpressionExec(planLater(query), refreshCache(r), write) :: Nil

    case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, _, _, Some(write)) =>
      OverwritePartitionsDynamicExec(planLater(query), refreshCache(r), write) :: Nil

    case DeleteFromTableWithFilters(r: DataSourceV2Relation, filters) =>
      DeleteFromTableExec(r.table.asDeletable, filters.toArray, refreshCache(r)) :: Nil

    case DeleteFromTable(relation, condition) =>
      relation match {
        case DataSourceV2ScanRelation(r, _, output, _, _) =>
          val table = r.table
          if (SubqueryExpression.hasSubquery(condition)) {
            throw QueryCompilationErrors.unsupportedDeleteByConditionWithSubqueryError(condition)
          }
          // fail if any filter cannot be converted.
          // correctness depends on removing all matching data.
          val filters = DataSourceStrategy.normalizeExprs(Seq(condition), output)
              .flatMap(splitConjunctivePredicates(_).map {
                f => DataSourceV2Strategy.translateFilterV2(f).getOrElse(
                  throw QueryCompilationErrors.cannotTranslateExpressionToSourceFilterError(f))
              }).toArray

          table match {
            case t: SupportsDeleteV2 if t.canDeleteWhere(filters) =>
              DeleteFromTableExec(t, filters, refreshCache(r)) :: Nil
            case t: SupportsDeleteV2 =>
              throw QueryCompilationErrors.cannotDeleteTableWhereFiltersError(t, filters)
            case t: TruncatableTable if condition == TrueLiteral =>
              TruncateTableExec(t, refreshCache(r)) :: Nil
            case _ =>
              throw QueryCompilationErrors.tableDoesNotSupportDeletesError(table)
          }
        case LogicalRelationWithTable(_, Some(catalogTable)) =>
          val tableIdentifier = catalogTable.identifier
          throw QueryCompilationErrors.unsupportedTableOperationError(
            tableIdentifier,
            "DELETE")
        case other =>
          throw SparkException.internalError("Unexpected table relation: " + other)
      }

    case ReplaceData(_: DataSourceV2Relation, _, query, r: DataSourceV2Relation, projections, _,
        Some(write)) =>
      // use the original relation to refresh the cache
      ReplaceDataExec(planLater(query), refreshCache(r), projections, write) :: Nil

    case WriteDelta(_: DataSourceV2Relation, _, query, r: DataSourceV2Relation, projections,
        Some(write)) =>
      // use the original relation to refresh the cache
      WriteDeltaExec(planLater(query), refreshCache(r), projections, write) :: Nil

    case MergeRows(isSourceRowPresent, isTargetRowPresent, matchedInstructions,
        notMatchedInstructions, notMatchedBySourceInstructions, checkCardinality, output, child) =>
      MergeRowsExec(isSourceRowPresent, isTargetRowPresent, matchedInstructions,
        notMatchedInstructions, notMatchedBySourceInstructions, checkCardinality,
        output, planLater(child)) :: Nil

    case WriteToContinuousDataSource(writer, query, customMetrics) =>
      WriteToContinuousDataSourceExec(writer, planLater(query), customMetrics) :: Nil

    case DescribeNamespace(ResolvedNamespace(catalog, ns, _), extended, output) =>
      DescribeNamespaceExec(output, catalog.asNamespaceCatalog, ns, extended) :: Nil

    case DescribeRelation(r: ResolvedTable, partitionSpec, isExtended, output) =>
      if (partitionSpec.nonEmpty) {
        throw QueryCompilationErrors.describeDoesNotSupportPartitionForV2TablesError()
      }
      DescribeTableExec(output, r.table, isExtended) :: Nil

    case DescribeColumn(r: ResolvedTable, column, isExtended, output) =>
      column match {
        case c: Attribute =>
          DescribeColumnExec(output, c, isExtended, r.table) :: Nil
        case nested =>
          throw QueryCompilationErrors.commandNotSupportNestedColumnError(
            "DESC TABLE COLUMN", toPrettySQL(nested))
      }

    case DropTable(r: ResolvedIdentifier, ifExists, purge) =>
      val invalidateFunc = () => CommandUtils.uncacheTableOrView(session, r)
      DropTableExec(r.catalog.asTableCatalog, r.identifier, ifExists, purge, invalidateFunc) :: Nil

    case _: NoopCommand =>
      LocalTableScanExec(Nil, Nil, None) :: Nil

    case RenameTable(r @ ResolvedTable(catalog, oldIdent, _, _), newIdent, isView) =>
      if (isView) {
        throw QueryCompilationErrors.cannotRenameTableWithAlterViewError()
      }
      RenameTableExec(
        catalog,
        oldIdent,
        newIdent.asIdentifier,
        invalidateTableCache(r),
        session.sharedState.cacheManager.cacheQuery) :: Nil

    case SetNamespaceProperties(ResolvedNamespace(catalog, ns, _), properties) =>
      AlterNamespaceSetPropertiesExec(catalog.asNamespaceCatalog, ns, properties) :: Nil

    case SetNamespaceLocation(ResolvedNamespace(catalog, ns, _), location) =>
      if (StringUtils.isEmpty(location)) {
        throw QueryExecutionErrors.invalidEmptyLocationError(location)
      }
      AlterNamespaceSetPropertiesExec(
        catalog.asNamespaceCatalog,
        ns,
        Map(SupportsNamespaces.PROP_LOCATION -> makeQualifiedDBObjectPath(location))) :: Nil

    case CommentOnNamespace(ResolvedNamespace(catalog, ns, _), comment) =>
      AlterNamespaceSetPropertiesExec(
        catalog.asNamespaceCatalog,
        ns,
        Map(SupportsNamespaces.PROP_COMMENT -> comment)) :: Nil

    case CreateNamespace(ResolvedNamespace(catalog, ns, _), ifNotExists, properties) =>
      val location = properties.get(SupportsNamespaces.PROP_LOCATION)
      if (location.isDefined && location.get.isEmpty) {
        throw QueryExecutionErrors.invalidEmptyLocationError(location.get)
      }
      val finalProperties = properties.get(SupportsNamespaces.PROP_LOCATION).map { loc =>
        properties + (SupportsNamespaces.PROP_LOCATION -> makeQualifiedDBObjectPath(loc))
      }.getOrElse(properties)
      CreateNamespaceExec(catalog.asNamespaceCatalog, ns, ifNotExists, finalProperties) :: Nil

    case DropNamespace(ResolvedNamespace(catalog, ns, _), ifExists, cascade) =>
      DropNamespaceExec(catalog, ns, ifExists, cascade) :: Nil

    case ShowTables(ResolvedNamespace(catalog, ns, _), pattern, output) =>
      ShowTablesExec(output, catalog.asTableCatalog, ns, pattern) :: Nil

    case ShowTablesExtended(
        ResolvedNamespace(catalog, ns, _),
        pattern,
        output) =>
      ShowTablesExtendedExec(output, catalog.asTableCatalog, ns, pattern) :: Nil

    case ShowTablePartition(r: ResolvedTable, part, output) =>
      ShowTablePartitionExec(output, r.catalog, r.identifier,
        r.table.asPartitionable, Seq(part).asResolvedPartitionSpecs.head) :: Nil

    case SetCatalogAndNamespace(ResolvedNamespace(catalog, ns, _)) =>
      val catalogManager = session.sessionState.catalogManager
      val namespace = if (ns.nonEmpty) Some(ns) else None
      SetCatalogAndNamespaceExec(catalogManager, Some(catalog.name()), namespace) :: Nil

    case ShowTableProperties(rt: ResolvedTable, propertyKey, output) =>
      ShowTablePropertiesExec(output, rt.table, rt.name, propertyKey) :: Nil

    case AnalyzeTable(_: ResolvedTable, _, _) | AnalyzeColumn(_: ResolvedTable, _, _) =>
      throw QueryCompilationErrors.analyzeTableNotSupportedForV2TablesError()

    case AddPartitions(
        r @ ResolvedTable(_, _, table: SupportsPartitionManagement, _), parts, ignoreIfExists) =>
      AddPartitionExec(
        table,
        parts.asResolvedPartitionSpecs,
        ignoreIfExists,
        recacheTable(r)) :: Nil

    case DropPartitions(
        r @ ResolvedTable(_, _, table: SupportsPartitionManagement, _),
        parts,
        ignoreIfNotExists,
        purge) =>
      DropPartitionExec(
        table,
        parts.asResolvedPartitionSpecs,
        ignoreIfNotExists,
        purge,
        recacheTable(r)) :: Nil

    case RenamePartitions(
        r @ ResolvedTable(_, _, table: SupportsPartitionManagement, _), from, to) =>
      RenamePartitionExec(
        table,
        Seq(from).asResolvedPartitionSpecs.head,
        Seq(to).asResolvedPartitionSpecs.head,
        recacheTable(r)) :: Nil

    case RecoverPartitions(_: ResolvedTable) =>
      throw QueryCompilationErrors.alterTableRecoverPartitionsNotSupportedForV2TablesError()

    case SetTableSerDeProperties(_: ResolvedTable, _, _, _) =>
      throw QueryCompilationErrors.alterTableSerDePropertiesNotSupportedForV2TablesError()

    case LoadData(_: ResolvedTable, _, _, _, _) =>
      throw QueryCompilationErrors.loadDataNotSupportedForV2TablesError()

    case ShowCreateTable(rt: ResolvedTable, asSerde, output) =>
      if (asSerde) {
        throw QueryCompilationErrors.showCreateTableAsSerdeNotSupportedForV2TablesError()
      }
      ShowCreateTableExec(output, rt) :: Nil

    case TruncateTable(r: ResolvedTable) =>
      TruncateTableExec(
        r.table.asTruncatable,
        recacheTable(r)) :: Nil

    case TruncatePartition(r: ResolvedTable, part) =>
      TruncatePartitionExec(
        r.table.asPartitionable,
        Seq(part).asResolvedPartitionSpecs.head,
        recacheTable(r)) :: Nil

    case ShowColumns(resolvedTable: ResolvedTable, ns, output) =>
      ns match {
        case Some(namespace) =>
          val tableNamespace = resolvedTable.identifier.namespace()
          if (namespace.length != tableNamespace.length ||
            !namespace.zip(tableNamespace).forall(SQLConf.get.resolver.tupled)) {
            throw QueryCompilationErrors.showColumnsWithConflictNamespacesError(
              namespace, tableNamespace.toSeq)
          }
        case _ =>
      }
      ShowColumnsExec(output, resolvedTable) :: Nil

    case r @ ShowPartitions(
        ResolvedTable(catalog, _, table: SupportsPartitionManagement, _),
        pattern @ (None | Some(_: ResolvedPartitionSpec)), output) =>
      ShowPartitionsExec(
        output,
        catalog,
        table,
        pattern.map(_.asInstanceOf[ResolvedPartitionSpec])) :: Nil

    case RepairTable(_: ResolvedTable, _, _) =>
      throw QueryCompilationErrors.repairTableNotSupportedForV2TablesError()

    case r: CacheTable =>
      CacheTableExec(r.table, r.multipartIdentifier, r.isLazy, r.options) :: Nil

    case r: CacheTableAsSelect =>
      CacheTableAsSelectExec(
        r.tempViewName, r.plan, r.originalText, r.isLazy, r.options, r.referredTempFunctions) :: Nil

    case r: UncacheTable =>
      def isTempView(table: LogicalPlan): Boolean = table match {
        case SubqueryAlias(_, v: View) => v.isTempView
        case _ => false
      }
      UncacheTableExec(r.table, cascade = !isTempView(r.table)) :: Nil

    case a: AlterTableCommand if a.table.resolved =>
      val table = a.table.asInstanceOf[ResolvedTable]
      ResolveTableConstraints.validateCatalogForTableChange(
        a.changes, table.catalog, table.identifier)
      AlterTableExec(table.catalog, table.identifier, a.changes) :: Nil

    case CreateIndex(ResolvedTable(_, _, table, _),
        indexName, indexType, ifNotExists, columns, properties) =>
      table match {
        case s: SupportsIndex =>
          val namedRefs = columns.map { case (field, prop) =>
            FieldReference(field.name) -> prop
          }
          CreateIndexExec(s, indexName, indexType, ifNotExists, namedRefs, properties) :: Nil
        case _ => throw QueryCompilationErrors.tableIndexNotSupportedError(
          s"CreateIndex is not supported in this table ${table.name}.")
      }

    case DropIndex(ResolvedTable(_, _, table, _), indexName, ifNotExists) =>
      table match {
        case s: SupportsIndex =>
          DropIndexExec(s, indexName, ifNotExists) :: Nil
        case _ => throw QueryCompilationErrors.tableIndexNotSupportedError(
          s"DropIndex is not supported in this table ${table.name}.")
      }

    case ShowFunctions(
      ResolvedNamespace(catalog, ns, _), userScope, systemScope, pattern, output) =>
      ShowFunctionsExec(
        output,
        catalog.asFunctionCatalog,
        ns,
        userScope,
        systemScope,
        pattern) :: Nil

    case c: Call =>
      ExplainOnlySparkPlan(c) :: Nil

    case _ => Nil
  }