def apply()

in integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala [57:226]


  def apply(plan: LogicalPlan): Seq[SparkPlan] = {
    val sparkSession = SparkSQLUtil.getSparkSession
    plan match {
      case _: ReturnAnswer => Nil
      // alter table
      case renameTable: AlterTableRenameCommand
        if isCarbonTable(renameTable.oldName) =>
        ExecutedCommandExec(DDLHelper.renameTable(renameTable)) :: Nil
      case compaction: CarbonAlterTableCompactionCommand =>
        CarbonPlanHelper.compact(compaction, sparkSession)
      case changeColumn: AlterTableChangeColumnCommand
        if isCarbonTable(changeColumn.tableName) =>
        ExecutedCommandExec(DDLHelper.changeColumn(changeColumn, sparkSession)) :: Nil
      case changeColumn: CarbonAlterTableColRenameDataTypeChangeCommand =>
        ExecutedCommandExec(changeColumn) :: Nil
      case colRenameDataTypeChange: CarbonAlterTableColRenameDataTypeChangeCommand =>
        CarbonPlanHelper.changeColumn(colRenameDataTypeChange, sparkSession)
      case addColumns: AlterTableAddColumnsCommand
        if isCarbonTable(addColumns.table) =>
        ExecutedCommandExec(DDLHelper.addColumns(addColumns, sparkSession)) :: Nil
      case addColumn: CarbonAlterTableAddColumnCommand =>
        CarbonPlanHelper.addColumn(addColumn, sparkSession)
      case dropColumn: CarbonAlterTableDropColumnCommand =>
        if (isCarbonTable(TableIdentifier(
          dropColumn.alterTableDropColumnModel.tableName,
          dropColumn.alterTableDropColumnModel.databaseName))) {
          CarbonPlanHelper.dropColumn(dropColumn, sparkSession)
        } else {
          throw new UnsupportedOperationException("Only carbondata table support drop column")
        }
      case AlterTableSetLocationCommand(tableName, _, _)
        if isCarbonTable(tableName) =>
        throw new UnsupportedOperationException("Set partition location is not supported")
      // partition
      case showPartitions: ShowPartitionsCommand
        if isCarbonTable(showPartitions.tableName) =>
        ExecutedCommandExec(DDLHelper.showPartitions(showPartitions, sparkSession)) :: Nil
      case dropPartition: AlterTableDropPartitionCommand
        if isCarbonTable(dropPartition.tableName) =>
        ExecutedCommandExec(DDLHelper.dropPartition(dropPartition, sparkSession)) :: Nil
      case renamePartition: AlterTableRenamePartitionCommand
        if isCarbonTable(renamePartition.tableName) =>
        throw new UnsupportedOperationException("Renaming partition on table is not supported")
      case addPartition: AlterTableAddPartitionCommand
        if isCarbonTable(addPartition.tableName) =>
        ExecutedCommandExec(DDLHelper.addPartition(addPartition)) :: Nil
      // set/unset/reset
      case set: SetCommand =>
        ExecutedCommandExec(CarbonSetCommand(set)) :: Nil
      case MatchResetCommand(_) =>
        ExecutedCommandExec(CarbonResetCommand()) :: Nil
      case setProperties: AlterTableSetPropertiesCommand
        if isCarbonTable(setProperties.tableName) =>
        ExecutedCommandExec(DDLHelper.setProperties(setProperties, sparkSession)) :: Nil
      case unsetProperties: AlterTableUnsetPropertiesCommand
        if isCarbonTable(unsetProperties.tableName) =>
        ExecutedCommandExec(DDLHelper.unsetProperties(unsetProperties)) :: Nil
      // create/describe/drop table
      case createTable: CreateTableCommand
        if isCarbonHiveTable(createTable.table) =>
        // CREATE TABLE STORED AS carbondata
        ExecutedCommandExec(DDLHelper.createHiveTable(createTable)) :: Nil
      case createTable: CreateTableCommand
        if isCarbonFileHiveTable(createTable.table) =>
        // CREATE TABLE STORED AS carbon
        if (EnvHelper.isLegacy(sparkSession)) {
          Nil
        } else {
          ExecutedCommandExec(DDLHelper.createCarbonFileHiveTable(createTable)) :: Nil
        }
      case ctas: CreateHiveTableAsSelectCommand
        if isCarbonHiveTable(ctas.tableDesc) =>
        // CREATE TABLE STORED AS carbondata AS SELECT
        ExecutedCommandExec(DDLHelper.createHiveTableAsSelect(ctas, sparkSession)) :: Nil
      case ctas: CreateHiveTableAsSelectCommand
        if isCarbonFileHiveTable(ctas.tableDesc) =>
        // CREATE TABLE STORED AS carbon AS SELECT
        if (EnvHelper.isLegacy(sparkSession)) {
          Nil
        } else {
          DataWritingCommandExec(
            DDLHelper.createCarbonFileHiveTableAsSelect(ctas),
            planLater(ctas.query)) :: Nil
        }
      case showCreateTable: ShowCreateTableCommand
        if isCarbonTable(showCreateTable.table) =>
        ExecutedCommandExec(CarbonShowCreateTableCommand(showCreateTable)) :: Nil
      case createLikeTable: CreateTableLikeCommand
        if isCarbonTable(createLikeTable.sourceTable) =>
        ExecutedCommandExec(CarbonCreateTableLikeCommand(createLikeTable.sourceTable,
          createLikeTable.targetTable, createLikeTable.ifNotExists)) :: Nil
      case truncateTable: TruncateTableCommand
        if isCarbonTable(truncateTable.tableName) =>
        ExecutedCommandExec(CarbonTruncateCommand(truncateTable)) :: Nil
      case createTable@org.apache.spark.sql.execution.datasources.CreateTable(_, _, None)
        if CarbonSource.isCarbonDataSource(createTable.tableDesc) =>
        // CREATE TABLE USING carbondata
        ExecutedCommandExec(DDLHelper.createDataSourceTable(createTable)) :: Nil
      case MatchCreateDataSourceTable(tableDesc, mode, query)
        if CarbonSource.isCarbonDataSource(tableDesc) =>
        // CREATE TABLE USING carbondata AS SELECT
        ExecutedCommandExec(
          DDLHelper.createDataSourceTableAsSelect(tableDesc, query, mode, sparkSession)
        ) :: Nil
      case org.apache.spark.sql.execution.datasources.CreateTable(tableDesc, mode, query)
        if CarbonSource.isCarbonDataSource(tableDesc) =>
        // CREATE TABLE USING carbondata AS SELECT
        ExecutedCommandExec(
          DDLHelper.createDataSourceTableAsSelect(tableDesc, query.get, mode, sparkSession)
        ) :: Nil
      case createTable@CreateDataSourceTableCommand(table, _)
        if CarbonSource.isCarbonDataSource(table) =>
        // CREATE TABLE USING carbondata
        ExecutedCommandExec(DDLHelper.createDataSourceTable(createTable)) :: Nil
      case desc: DescribeTableCommand if isCarbonTable(desc.table) =>
        ExecutedCommandExec(DDLHelper.describeTable(desc, sparkSession)) :: Nil
      case DropTableCommand(identifier, ifNotExists, _, _)
        if isCarbonTable(identifier) =>
        ExecutedCommandExec(
          CarbonDropTableCommand(ifNotExists, identifier.database, identifier.table.toLowerCase)
        ) :: Nil
      // refresh
      case refreshTable: CarbonToSparkAdapter.RefreshTables =>
        ExecutedCommandExec(DDLHelper.refreshTable(refreshTable)) :: Nil
      case refreshResource: RefreshResource =>
        DDLHelper.refreshResource(refreshResource)
      // database
      case createDb: CreateDatabaseCommand =>
        ExecutedCommandExec(DDLHelper.createDatabase(createDb, sparkSession)) :: Nil
      case drop@DropDatabaseCommand(dbName, ifExists, _) =>
        ExecutedCommandExec(CarbonDropDatabaseCommand(drop)) :: Nil
      // explain
      case explain: ExplainCommand =>
        DDLHelper.explain(explain, sparkSession)
      case showTables: ShowTablesCommand =>
        ExecutedCommandExec(CarbonShowTablesCommand(showTables)) :: Nil
      case CarbonCreateSecondaryIndexCommand(
      indexModel, tableProperties, ifNotExists, isDeferredRefresh, isCreateSIndex) =>
        val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetaStore
          .tableExists(TableIdentifier(indexModel.tableName, indexModel.dbName))(
            sparkSession)
        if (isCarbonTable) {
          ExecutedCommandExec(CarbonCreateSecondaryIndexCommand(
            indexModel, tableProperties, ifNotExists, isDeferredRefresh, isCreateSIndex)) :: Nil
        } else {
          sys.error(s"Operation not allowed because either table " +
            s"${indexModel.tableName} doesn't exist or not a carbon table.")
        }
      case showIndex@ShowIndexesCommand(_, _) =>
        try {
          ExecutedCommandExec(showIndex) :: Nil
        } catch {
          case c: Exception =>
            sys.error("Operation not allowed on non-carbon table")
        }
      case dropIndex@DropIndexCommand(ifExistsSet, databaseNameOp, parentTableName, tableName, _) =>
        val tableIdentifier = TableIdentifier(parentTableName, databaseNameOp)
        val isParentTableExists = sparkSession.sessionState.catalog.tableExists(tableIdentifier)
        if (!isParentTableExists) {
          if (!ifExistsSet) {
            sys.error(s"Table $tableIdentifier does not exist")
          } else {
            Nil
          }
        } else {
          ExecutedCommandExec(dropIndex) :: Nil
        }
      case _ => Nil
    }
  }