override def processMetadata()

in integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala [46:202]


  override def processMetadata(sparkSession: SparkSession): Seq[Nothing] = {
    val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    val oldTableName = alterTableRenameModel.oldTableIdentifier.table.toLowerCase
    val newTableName = alterTableRenameModel.newTableIdentifier.table.toLowerCase
    val oldDatabaseName = alterTableRenameModel.oldTableIdentifier.database
      .getOrElse(sparkSession.catalog.currentDatabase)
    val oldTableIdentifier = TableIdentifier(oldTableName, Some(oldDatabaseName))
    val newTableIdentifier = TableIdentifier(newTableName, Some(oldDatabaseName))
    setAuditTable(oldDatabaseName, oldTableIdentifier.table)
    setAuditInfo(Map("newName" -> alterTableRenameModel.newTableIdentifier.table))
    val newDatabaseName = newTableIdentifier.database
      .getOrElse(sparkSession.catalog.currentDatabase)
    if (!oldDatabaseName.equalsIgnoreCase(newDatabaseName)) {
      throw new MalformedCarbonCommandException("Database name should be same for both tables")
    }
    val tableExists = sparkSession.catalog.tableExists(oldDatabaseName, newTableIdentifier.table)
    if (tableExists) {
      throw new MalformedCarbonCommandException(s"Table with name $newTableIdentifier " +
                                                s"already exists")
    }
    LOGGER.info(s"Rename table request has been received for $oldDatabaseName.$oldTableName")
    val metastore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
    val relation: CarbonRelation =
      metastore.lookupRelation(oldTableIdentifier.database, oldTableName)(sparkSession)
        .asInstanceOf[CarbonRelation]
    if (relation == null) {
      throwMetadataException(oldDatabaseName, oldTableName, "Table does not exist")
    }

    var carbonTable: CarbonTable = relation.carbonTable
    if (!carbonTable.getTableInfo.isTransactionalTable) {
      throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
    }

    if (!carbonTable.canAllow(carbonTable, TableOperation.ALTER_RENAME)) {
      throw new MalformedCarbonCommandException("alter rename is not supported for this table")
    }
    // if table have created MV, not support table rename
    if (MVManagerInSpark.get(sparkSession).hasSchemaOnTable(carbonTable) || carbonTable.isMV) {
      throw new MalformedCarbonCommandException(
        "alter rename is not supported for MV table or for tables which have child MV")
    }

    var timeStamp = 0L
    var hiveRenameSuccess = false
    // lock file path to release locks after operation
    var carbonTableLockFilePath: String = null
    var originalIndexStatusBeforeDisable: IndexStatus = null
    try {
      carbonTableLockFilePath = carbonTable.getTablePath
      // if any load is in progress for table, do not allow rename table
      if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
        throw new ConcurrentOperationException(carbonTable, "loading", "alter table rename")
      }
      // invalid index for the old table, see CARBON-1690
      val oldAbsoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
      IndexStoreManager.getInstance().clearIndex(oldAbsoluteTableIdentifier)
      // get the latest carbon table and check for column existence
      val operationContext = new OperationContext
      if (carbonTable.isIndexTable) {
        val oldIndexName = alterTableRenameModel.oldTableIdentifier.table
        val parentTableName = carbonTable.getParentTableName
        val parentTable: CarbonTable = CarbonEnv.getCarbonTable(
          Some(oldDatabaseName), parentTableName)(sparkSession)
        originalIndexStatusBeforeDisable = CarbonIndexUtil.updateIndexInfo(
          parentTable, oldIndexName, IndexType.SI, IndexStatus.DISABLED)(sparkSession)
      } else {
        // TODO: Pass new Table Path in pre-event.
        val alterTableRenamePreEvent: AlterTableRenamePreEvent = AlterTableRenamePreEvent(
          carbonTable, alterTableRenameModel, "", sparkSession)
        OperationListenerBus.getInstance().fireEvent(alterTableRenamePreEvent, operationContext)
      }
      val tableInfo: org.apache.carbondata.format.TableInfo =
        metastore.getThriftTableInfo(carbonTable)
      val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
      schemaEvolutionEntry.setTableName(newTableName)
      timeStamp = System.currentTimeMillis()
      schemaEvolutionEntry.setTime_stamp(timeStamp)
      val newCarbonTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
        newTableName, carbonTable.getCarbonTableIdentifier.getTableId)
      metastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
      sparkSession.catalog.refreshTable(oldTableIdentifier.quotedString)
      CarbonSessionCatalogUtil.alterTableRename(
        oldTableIdentifier,
        newTableIdentifier,
        oldAbsoluteTableIdentifier.getTablePath,
        sparkSession,
        carbonTable.isExternalTable)
      hiveRenameSuccess = true

      metastore.updateTableSchemaForAlter(
        newCarbonTableIdentifier,
        carbonTable.getCarbonTableIdentifier,
        tableInfo,
        schemaEvolutionEntry,
        carbonTable.getTablePath)(sparkSession)
      new MockClassForAlterRevertTests().mockForAlterRevertTest()

      if (carbonTable.isIndexTable) {
        val oldIndexName = alterTableRenameModel.oldTableIdentifier.table
        val parentTableName = carbonTable.getParentTableName
        val parentTable: CarbonTable = metastore
          .lookupRelation(Some(oldDatabaseName), parentTableName)(sparkSession)
          .asInstanceOf[CarbonRelation].carbonTable
        CarbonIndexUtil.updateIndexInfo(parentTable, oldIndexName, IndexType.SI,
          originalIndexStatusBeforeDisable, newTableName)(sparkSession)
        metastore.lookupRelation(Some(oldDatabaseName), newTableName)(sparkSession)
          .asInstanceOf[CarbonRelation]
        AlterTableUtil.updateSchemaInfo(parentTable, null,
          metastore.getThriftTableInfo(parentTable))(sparkSession)
      } else {
        val alterTableRenamePostEvent: AlterTableRenamePostEvent = AlterTableRenamePostEvent(
          carbonTable,
          alterTableRenameModel,
          oldAbsoluteTableIdentifier.getTablePath,
          sparkSession)
        OperationListenerBus.getInstance().fireEvent(alterTableRenamePostEvent, operationContext)
      }

      sparkSession.catalog.refreshTable(newTableIdentifier.quotedString)
      LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName")
    } catch {
      case e: ConcurrentOperationException =>
        throw e
      case e: Exception =>
        if (hiveRenameSuccess) {
          CarbonSessionCatalogUtil.alterTableRename(
            newTableIdentifier,
            oldTableIdentifier,
            carbonTable.getAbsoluteTableIdentifier.getTablePath,
            sparkSession,
            carbonTable.isExternalTable)
        }
        // it means rename table is index table and disable index has succeed, need revert
        if (originalIndexStatusBeforeDisable != null &&
            !originalIndexStatusBeforeDisable.equals(IndexStatus.DISABLED)) {
          val oldIndexName = alterTableRenameModel.oldTableIdentifier.table
          val parentTableName = carbonTable.getParentTableName
          val parentTable: CarbonTable = CarbonEnv.getCarbonTable(
            Some(oldDatabaseName), parentTableName)(sparkSession)
          CarbonIndexUtil.updateIndexInfo(parentTable, oldIndexName, IndexType.SI,
            originalIndexStatusBeforeDisable)(sparkSession)
          metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
            .asInstanceOf[CarbonRelation]
        }
        if (carbonTable != null) {
          AlterTableUtil.revertRenameTableChanges(
            newTableName,
            carbonTable,
            timeStamp)(
            sparkSession)
        }
        throwMetadataException(oldDatabaseName, oldTableName,
          opName + " operation failed: " + e.getMessage)
    }
    Seq.empty
  }