in integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala [88:370]
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val tableName = alterTableColRenameAndDataTypeChangeModel.tableName
val dbName = alterTableColRenameAndDataTypeChangeModel.databaseName
.getOrElse(sparkSession.catalog.currentDatabase)
var isDataTypeChange = false
setAuditTable(dbName, tableName)
setAuditInfo(Map(
"column" -> alterTableColRenameAndDataTypeChangeModel.columnName,
"newColumn" -> alterTableColRenameAndDataTypeChangeModel.newColumnName,
"newType" -> alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.dataType))
val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
var locks = List.empty[ICarbonLock]
// get the latest carbon table and check for column existence
var carbonTable: CarbonTable = null
var timeStamp = 0L
try {
locks = AlterTableUtil
.validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
if (!carbonTable.isTransactionalTable) {
throw new MalformedCarbonCommandException(
"Unsupported operation on non transactional table")
}
if (!alterTableColRenameAndDataTypeChangeModel.isColumnRename &&
!carbonTable.canAllow(carbonTable, TableOperation.ALTER_CHANGE_DATATYPE,
alterTableColRenameAndDataTypeChangeModel.columnName)) {
throw new MalformedCarbonCommandException(
"alter table change datatype is not supported for index indexSchema")
}
if (alterTableColRenameAndDataTypeChangeModel.isColumnRename &&
!carbonTable.canAllow(carbonTable, TableOperation.ALTER_COLUMN_RENAME,
alterTableColRenameAndDataTypeChangeModel.columnName)) {
throw new MalformedCarbonCommandException(
"alter table column rename is not supported for index indexSchema")
}
// Do not allow spatial index column and its source columns to be changed.
AlterTableUtil.validateColumnsWithSpatialIndexProperties(carbonTable,
List(alterTableColRenameAndDataTypeChangeModel.columnName))
val operationContext = new OperationContext
operationContext.setProperty("childTableColumnRename", childTableColumnRename)
val alterTableColRenameAndDataTypeChangePreEvent =
AlterTableColRenameAndDataTypeChangePreEvent(sparkSession, carbonTable,
alterTableColRenameAndDataTypeChangeModel)
OperationListenerBus.getInstance()
.fireEvent(alterTableColRenameAndDataTypeChangePreEvent, operationContext)
val newColumnName = alterTableColRenameAndDataTypeChangeModel.newColumnName.toLowerCase
val oldColumnName = alterTableColRenameAndDataTypeChangeModel.columnName.toLowerCase
val isColumnRename = alterTableColRenameAndDataTypeChangeModel.isColumnRename
if (isColumnRename) {
alteredColumnNamesMap += (oldColumnName -> newColumnName)
}
val newColumnComment = alterTableColRenameAndDataTypeChangeModel.newColumnComment
val carbonColumns = carbonTable.getCreateOrderColumn().asScala.filter(!_.isInvisible)
if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(oldColumnName))) {
throwMetadataException(dbName, tableName, s"Column does not exist: $oldColumnName")
}
val oldCarbonColumn = carbonColumns.filter(_.getColName.equalsIgnoreCase(oldColumnName))
if (oldCarbonColumn.size != 1) {
throwMetadataException(dbName, tableName, s"Invalid Column: $oldColumnName")
}
val newColumnPrecision = alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.precision
val newColumnScale = alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.scale
// set isDataTypeChange flag
val oldDatatype = oldCarbonColumn.head.getDataType
val newDatatype = alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.dataType
if (oldDatatype.getName.equalsIgnoreCase(newDatatype)) {
val newColumnPrecision =
alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.precision
val newColumnScale = alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.scale
// if the source datatype is decimal and there is change in precision and scale, then
// along with rename, datatype change is also required for the command, so set the
// isDataTypeChange flag to true in this case
if (DataTypes.isDecimal(oldDatatype) &&
(oldDatatype.asInstanceOf[DecimalType].getPrecision !=
newColumnPrecision ||
oldDatatype.asInstanceOf[DecimalType].getScale !=
newColumnScale)) {
isDataTypeChange = true
}
if (oldDatatype.isComplexType) {
val oldParent = oldCarbonColumn.head
val oldChildren = oldParent.asInstanceOf[CarbonDimension].getListOfChildDimensions.asScala
.toList
AlterTableUtil.validateComplexStructure(oldChildren,
alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.getChildren(),
alteredColumnNamesMap, alteredDatatypesMap)
}
} else {
if (oldDatatype.isComplexType) {
throw new UnsupportedOperationException(
"Old and new complex columns are not compatible in structure")
}
isDataTypeChange = true
}
// If there is no columnrename and datatype change and comment change
// return directly without execution
if (!isColumnRename && !isDataTypeChange && !newColumnComment.isDefined &&
alteredColumnNamesMap.isEmpty && alteredDatatypesMap.isEmpty) {
return Seq.empty
}
// if column datatype change operation is on partition column, then fail the
// chang column operation
if (null != carbonTable.getPartitionInfo) {
val partitionColumns = carbonTable.getPartitionInfo.getColumnSchemaList
partitionColumns.asScala.foreach {
col =>
if (col.getColumnName.equalsIgnoreCase(oldColumnName)) {
throw new InvalidOperationException(
s"Alter on partition column $newColumnName is not supported")
}
}
}
if (!alteredColumnNamesMap.isEmpty) {
// validate the columns to be renamed
validColumnsForRenaming(carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala,
alteredColumnNamesMap, carbonTable)
}
if (isDataTypeChange) {
// validate the columns to change datatype
AlterTableUtil.validateColumnDataType(alterTableColRenameAndDataTypeChangeModel
.dataTypeInfo,
oldCarbonColumn.head)
}
// read the latest schema file
val tableInfo: TableInfo =
metaStore.getThriftTableInfo(carbonTable)
// maintain the added column for schema evolution history
var addedTableColumnSchema: ColumnSchema = null
var deletedColumnSchema: ColumnSchema = null
var schemaEvolutionEntry: SchemaEvolutionEntry = null
val columnSchemaList = tableInfo.fact_table.table_columns.asScala.filter(!_.isInvisible)
var addedColumnsList: List[ColumnSchema] = List.empty[ColumnSchema]
var deletedColumnsList: List[ColumnSchema] = List.empty[ColumnSchema]
/*
* columnSchemaList is a flat structure containing all column schemas including both parent
* and child.
* It is iterated and rename/change-datatype update are made in this list itself.
* Entry is made to the schemaEvolutionEntry for each of the update.
*/
columnSchemaList.foreach { columnSchema =>
val columnName = columnSchema.column_name
val isTableColumnAltered = columnName.equalsIgnoreCase(oldColumnName)
var isSchemaEntryRequired = false
deletedColumnSchema = columnSchema.deepCopy()
if (isTableColumnAltered) {
// isColumnRename will be true if the table-column/parent-column name has been altered,
// just get the columnSchema and rename, and make a schemaEvolutionEntry
if (isColumnRename) {
columnSchema.setColumn_name(newColumnName)
isSchemaEntryRequired = true
}
// if the table column rename is false, it will be just table column datatype change
// only, then change the datatype and make an evolution entry, If both the operations
// are happening, then rename, change datatype and make an evolution entry
if (isDataTypeChange) {
// if only datatype change, just get the column schema and change datatype, make a
// schemaEvolutionEntry
columnSchema.setData_type(
DataTypeConverterUtil.convertToThriftDataType(
alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.dataType))
columnSchema
.setPrecision(newColumnPrecision)
columnSchema.setScale(newColumnScale)
isSchemaEntryRequired = true
}
if (newColumnComment.isDefined && columnSchema.getColumnProperties != null) {
columnSchema.getColumnProperties.put(
CarbonCommonConstants.COLUMN_COMMENT, newColumnComment.get)
} else if (newColumnComment.isDefined) {
val newColumnProperties = new util.HashMap[String, String]
newColumnProperties.put(CarbonCommonConstants.COLUMN_COMMENT, newColumnComment.get)
columnSchema.setColumnProperties(newColumnProperties)
}
addedTableColumnSchema = columnSchema
} else if (isComplexChild(columnSchema)) {
// check if name is altered
if (!alteredColumnNamesMap.isEmpty) {
if (alteredColumnNamesMap.contains(columnName)) {
// matches exactly
val newComplexChildName = alteredColumnNamesMap(columnName)
columnSchema.setColumn_name(newComplexChildName)
isSchemaEntryRequired = true
} else {
val alteredParent = checkIfParentIsAltered(columnName)
/*
* Lets say, if complex schema is: str struct<a: int>
* and if parent column is changed from str -> str2
* then its child name should also be changed from str.a -> str2.a
*/
if (alteredParent != null) {
val newParent = alteredColumnNamesMap(alteredParent)
val newComplexChildName = newParent + columnName
.split(alteredParent)(1)
columnSchema.setColumn_name(newComplexChildName)
isSchemaEntryRequired = true
}
}
}
// check if datatype is altered
if (!alteredDatatypesMap.isEmpty && alteredDatatypesMap.get(columnName) != None) {
val newDatatype = alteredDatatypesMap.get(columnName).get
if (newDatatype.equals(CarbonCommonConstants.LONG)) {
columnSchema.setData_type(DataType.LONG)
} else if (newDatatype.contains(CarbonCommonConstants.DECIMAL)) {
val (newPrecision, newScale) = CommonUtil.getScaleAndPrecision(newDatatype)
columnSchema.setPrecision(newPrecision)
columnSchema.setScale(newScale)
}
isSchemaEntryRequired = true
}
}
// make a new schema evolution entry after column rename or datatype change
if (isSchemaEntryRequired) {
addedColumnsList ++= List(columnSchema)
deletedColumnsList ++= List(deletedColumnSchema)
timeStamp = System.currentTimeMillis()
schemaEvolutionEntry = AlterTableUtil.addNewSchemaEvolutionEntry(schemaEvolutionEntry,
addedColumnsList,
deletedColumnsList,
timeStamp)
}
}
// modify the table Properties with new column name if column rename happened
if (alterTableColRenameAndDataTypeChangeModel.isColumnRename) {
AlterTableUtil
.modifyTablePropertiesAfterColumnRename(tableInfo.fact_table.tableProperties.asScala,
oldColumnName, newColumnName)
}
updateSchemaAndRefreshTable(sparkSession,
carbonTable,
tableInfo,
addedTableColumnSchema,
schemaEvolutionEntry,
oldCarbonColumn.head)
new MockClassForAlterRevertTests().mockForAlterAddColRevertTest()
val alterTableColRenameAndDataTypeChangePostEvent
: AlterTableColRenameAndDataTypeChangePostEvent =
AlterTableColRenameAndDataTypeChangePostEvent(sparkSession, carbonTable,
alterTableColRenameAndDataTypeChangeModel)
OperationListenerBus.getInstance
.fireEvent(alterTableColRenameAndDataTypeChangePostEvent, operationContext)
if (isDataTypeChange) {
LOGGER
.info(s"Alter table for column rename or data type change is successful for table " +
s"$dbName.$tableName")
}
if (alterTableColRenameAndDataTypeChangeModel.isColumnRename) {
LOGGER.info(s"Alter table for column rename is successful for table $dbName.$tableName")
}
} catch {
case e: Exception =>
if (carbonTable != null) {
if (carbonTable.isTransactionalTable) {
AlterTableUtil
.revertColumnRenameAndDataTypeChanges(carbonTable, timeStamp)(sparkSession)
}
}
if (isDataTypeChange) {
throwMetadataException(dbName, tableName,
s"Alter table data type change operation failed: ${ e.getMessage }")
} else {
throwMetadataException(dbName, tableName,
s"Alter table data type change or column rename operation failed: ${ e.getMessage }")
}
} finally {
// release lock after command execution completion
AlterTableUtil.releaseLocks(locks)
}
Seq.empty
}