override def processMetadata()

in integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala [88:370]


  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
    val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    val tableName = alterTableColRenameAndDataTypeChangeModel.tableName
    val dbName = alterTableColRenameAndDataTypeChangeModel.databaseName
      .getOrElse(sparkSession.catalog.currentDatabase)
    var isDataTypeChange = false
    setAuditTable(dbName, tableName)
    setAuditInfo(Map(
      "column" -> alterTableColRenameAndDataTypeChangeModel.columnName,
      "newColumn" -> alterTableColRenameAndDataTypeChangeModel.newColumnName,
      "newType" -> alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.dataType))
    val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
    var locks = List.empty[ICarbonLock]
    // get the latest carbon table and check for column existence
    var carbonTable: CarbonTable = null
    var timeStamp = 0L
    try {
      locks = AlterTableUtil
        .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
      val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
      carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
      if (!carbonTable.isTransactionalTable) {
        throw new MalformedCarbonCommandException(
          "Unsupported operation on non transactional table")
      }
      if (!alterTableColRenameAndDataTypeChangeModel.isColumnRename &&
          !carbonTable.canAllow(carbonTable, TableOperation.ALTER_CHANGE_DATATYPE,
            alterTableColRenameAndDataTypeChangeModel.columnName)) {
        throw new MalformedCarbonCommandException(
          "alter table change datatype is not supported for index indexSchema")
      }
      if (alterTableColRenameAndDataTypeChangeModel.isColumnRename &&
          !carbonTable.canAllow(carbonTable, TableOperation.ALTER_COLUMN_RENAME,
            alterTableColRenameAndDataTypeChangeModel.columnName)) {
        throw new MalformedCarbonCommandException(
          "alter table column rename is not supported for index indexSchema")
      }
      // Do not allow spatial index column and its source columns to be changed.
      AlterTableUtil.validateColumnsWithSpatialIndexProperties(carbonTable,
        List(alterTableColRenameAndDataTypeChangeModel.columnName))
      val operationContext = new OperationContext
      operationContext.setProperty("childTableColumnRename", childTableColumnRename)
      val alterTableColRenameAndDataTypeChangePreEvent =
        AlterTableColRenameAndDataTypeChangePreEvent(sparkSession, carbonTable,
          alterTableColRenameAndDataTypeChangeModel)
      OperationListenerBus.getInstance()
        .fireEvent(alterTableColRenameAndDataTypeChangePreEvent, operationContext)
      val newColumnName = alterTableColRenameAndDataTypeChangeModel.newColumnName.toLowerCase
      val oldColumnName = alterTableColRenameAndDataTypeChangeModel.columnName.toLowerCase
      val isColumnRename = alterTableColRenameAndDataTypeChangeModel.isColumnRename
      if (isColumnRename) {
        alteredColumnNamesMap += (oldColumnName -> newColumnName)
      }
      val newColumnComment = alterTableColRenameAndDataTypeChangeModel.newColumnComment
      val carbonColumns = carbonTable.getCreateOrderColumn().asScala.filter(!_.isInvisible)
      if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(oldColumnName))) {
        throwMetadataException(dbName, tableName, s"Column does not exist: $oldColumnName")
      }

      val oldCarbonColumn = carbonColumns.filter(_.getColName.equalsIgnoreCase(oldColumnName))
      if (oldCarbonColumn.size != 1) {
        throwMetadataException(dbName, tableName, s"Invalid Column: $oldColumnName")
      }
      val newColumnPrecision = alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.precision
      val newColumnScale = alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.scale
      // set isDataTypeChange flag
      val oldDatatype = oldCarbonColumn.head.getDataType
      val newDatatype = alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.dataType
      if (oldDatatype.getName.equalsIgnoreCase(newDatatype)) {
        val newColumnPrecision =
          alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.precision
        val newColumnScale = alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.scale
        // if the source datatype is decimal and there is change in precision and scale, then
        // along with rename, datatype change is also required for the command, so set the
        // isDataTypeChange flag to true in this case
        if (DataTypes.isDecimal(oldDatatype) &&
            (oldDatatype.asInstanceOf[DecimalType].getPrecision !=
             newColumnPrecision ||
             oldDatatype.asInstanceOf[DecimalType].getScale !=
             newColumnScale)) {
          isDataTypeChange = true
        }
        if (oldDatatype.isComplexType) {
          val oldParent = oldCarbonColumn.head
          val oldChildren = oldParent.asInstanceOf[CarbonDimension].getListOfChildDimensions.asScala
            .toList
          AlterTableUtil.validateComplexStructure(oldChildren,
            alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.getChildren(),
            alteredColumnNamesMap, alteredDatatypesMap)
        }
      } else {
        if (oldDatatype.isComplexType) {
          throw new UnsupportedOperationException(
            "Old and new complex columns are not compatible in structure")
        }
        isDataTypeChange = true
      }

      // If there is no columnrename and datatype change and comment change
      // return directly without execution
      if (!isColumnRename && !isDataTypeChange && !newColumnComment.isDefined &&
          alteredColumnNamesMap.isEmpty && alteredDatatypesMap.isEmpty) {
        return Seq.empty
      }
      // if column datatype change operation is on partition column, then fail the
      // chang column operation
      if (null != carbonTable.getPartitionInfo) {
        val partitionColumns = carbonTable.getPartitionInfo.getColumnSchemaList
        partitionColumns.asScala.foreach {
          col =>
            if (col.getColumnName.equalsIgnoreCase(oldColumnName)) {
              throw new InvalidOperationException(
                s"Alter on partition column $newColumnName is not supported")
            }
        }
      }

      if (!alteredColumnNamesMap.isEmpty) {
        // validate the columns to be renamed
        validColumnsForRenaming(carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala,
          alteredColumnNamesMap, carbonTable)
      }

      if (isDataTypeChange) {
        // validate the columns to change datatype
        AlterTableUtil.validateColumnDataType(alterTableColRenameAndDataTypeChangeModel
          .dataTypeInfo,
          oldCarbonColumn.head)
      }
      // read the latest schema file
      val tableInfo: TableInfo =
        metaStore.getThriftTableInfo(carbonTable)
      // maintain the added column for schema evolution history
      var addedTableColumnSchema: ColumnSchema = null
      var deletedColumnSchema: ColumnSchema = null
      var schemaEvolutionEntry: SchemaEvolutionEntry = null
      val columnSchemaList = tableInfo.fact_table.table_columns.asScala.filter(!_.isInvisible)

      var addedColumnsList: List[ColumnSchema] = List.empty[ColumnSchema]
      var deletedColumnsList: List[ColumnSchema] = List.empty[ColumnSchema]

      /*
      * columnSchemaList is a flat structure containing all column schemas including both parent
      * and child.
      * It is iterated and rename/change-datatype update are made in this list itself.
      * Entry is made to the schemaEvolutionEntry for each of the update.
      */
      columnSchemaList.foreach { columnSchema =>
        val columnName = columnSchema.column_name
        val isTableColumnAltered = columnName.equalsIgnoreCase(oldColumnName)
        var isSchemaEntryRequired = false
        deletedColumnSchema = columnSchema.deepCopy()

        if (isTableColumnAltered) {
          // isColumnRename will be true if the table-column/parent-column name has been altered,
          // just get the columnSchema and rename, and make a schemaEvolutionEntry
          if (isColumnRename) {
            columnSchema.setColumn_name(newColumnName)
            isSchemaEntryRequired = true
          }

          // if the table column rename is false, it will be just table column datatype change
          // only, then change the datatype and make an evolution entry, If both the operations
          // are happening, then rename, change datatype and make an evolution entry
          if (isDataTypeChange) {
            // if only datatype change,  just get the column schema and change datatype, make a
            // schemaEvolutionEntry
            columnSchema.setData_type(
              DataTypeConverterUtil.convertToThriftDataType(
                alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.dataType))
            columnSchema
              .setPrecision(newColumnPrecision)
            columnSchema.setScale(newColumnScale)
            isSchemaEntryRequired = true
          }

          if (newColumnComment.isDefined && columnSchema.getColumnProperties != null) {
            columnSchema.getColumnProperties.put(
              CarbonCommonConstants.COLUMN_COMMENT, newColumnComment.get)
          } else if (newColumnComment.isDefined) {
            val newColumnProperties = new util.HashMap[String, String]
            newColumnProperties.put(CarbonCommonConstants.COLUMN_COMMENT, newColumnComment.get)
            columnSchema.setColumnProperties(newColumnProperties)
          }
          addedTableColumnSchema = columnSchema
        } else if (isComplexChild(columnSchema)) {
          // check if name is altered
          if (!alteredColumnNamesMap.isEmpty) {
            if (alteredColumnNamesMap.contains(columnName)) {
              // matches exactly
              val newComplexChildName = alteredColumnNamesMap(columnName)
              columnSchema.setColumn_name(newComplexChildName)
              isSchemaEntryRequired = true
            } else {
              val alteredParent = checkIfParentIsAltered(columnName)
              /*
               * Lets say, if complex schema is: str struct<a: int>
               * and if parent column is changed from str -> str2
               * then its child name should also be changed from str.a -> str2.a
               */
              if (alteredParent != null) {
                val newParent = alteredColumnNamesMap(alteredParent)
                val newComplexChildName = newParent + columnName
                  .split(alteredParent)(1)
                columnSchema.setColumn_name(newComplexChildName)
                isSchemaEntryRequired = true
              }
            }
          }
          // check if datatype is altered
          if (!alteredDatatypesMap.isEmpty && alteredDatatypesMap.get(columnName) != None) {
            val newDatatype = alteredDatatypesMap.get(columnName).get
            if (newDatatype.equals(CarbonCommonConstants.LONG)) {
              columnSchema.setData_type(DataType.LONG)
            } else if (newDatatype.contains(CarbonCommonConstants.DECIMAL)) {
              val (newPrecision, newScale) = CommonUtil.getScaleAndPrecision(newDatatype)
              columnSchema.setPrecision(newPrecision)
              columnSchema.setScale(newScale)
            }
            isSchemaEntryRequired = true
          }
        }

        // make a new schema evolution entry after column rename or datatype change
        if (isSchemaEntryRequired) {
          addedColumnsList ++= List(columnSchema)
          deletedColumnsList ++= List(deletedColumnSchema)
          timeStamp = System.currentTimeMillis()
          schemaEvolutionEntry = AlterTableUtil.addNewSchemaEvolutionEntry(schemaEvolutionEntry,
            addedColumnsList,
            deletedColumnsList,
            timeStamp)
        }
      }

      // modify the table Properties with new column name if column rename happened
      if (alterTableColRenameAndDataTypeChangeModel.isColumnRename) {
        AlterTableUtil
          .modifyTablePropertiesAfterColumnRename(tableInfo.fact_table.tableProperties.asScala,
            oldColumnName, newColumnName)
      }
      updateSchemaAndRefreshTable(sparkSession,
        carbonTable,
        tableInfo,
        addedTableColumnSchema,
        schemaEvolutionEntry,
        oldCarbonColumn.head)
      new MockClassForAlterRevertTests().mockForAlterAddColRevertTest()
      val alterTableColRenameAndDataTypeChangePostEvent
      : AlterTableColRenameAndDataTypeChangePostEvent =
        AlterTableColRenameAndDataTypeChangePostEvent(sparkSession, carbonTable,
          alterTableColRenameAndDataTypeChangeModel)
      OperationListenerBus.getInstance
        .fireEvent(alterTableColRenameAndDataTypeChangePostEvent, operationContext)
      if (isDataTypeChange) {
        LOGGER
          .info(s"Alter table for column rename or data type change is successful for table " +
                s"$dbName.$tableName")
      }
      if (alterTableColRenameAndDataTypeChangeModel.isColumnRename) {
        LOGGER.info(s"Alter table for column rename is successful for table $dbName.$tableName")
      }
    } catch {
      case e: Exception =>
        if (carbonTable != null) {
          if (carbonTable.isTransactionalTable) {
            AlterTableUtil
              .revertColumnRenameAndDataTypeChanges(carbonTable, timeStamp)(sparkSession)
          }
        }
        if (isDataTypeChange) {
          throwMetadataException(dbName, tableName,
            s"Alter table data type change operation failed: ${ e.getMessage }")
        } else {
          throwMetadataException(dbName, tableName,
            s"Alter table data type change or column rename operation failed: ${ e.getMessage }")
        }
    } finally {
      // release lock after command execution completion
      AlterTableUtil.releaseLocks(locks)
    }
    Seq.empty
  }