in flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java [1880:1989]
private Operation convertAlterTableChangeCol(
CatalogBaseTable alteredTable, String[] qualified, HiveParserASTNode ast)
throws SemanticException {
String newComment = null;
boolean first = false;
String flagCol = null;
boolean isCascade = false;
// col_old_name col_new_name column_type [COMMENT col_comment] [FIRST|AFTER column_name]
// [CASCADE|RESTRICT]
String oldColName = ast.getChild(0).getText();
String newColName = ast.getChild(1).getText();
String newType =
HiveParserBaseSemanticAnalyzer.getTypeStringFromAST(
(HiveParserASTNode) ast.getChild(2));
int childCount = ast.getChildCount();
for (int i = 3; i < childCount; i++) {
HiveParserASTNode child = (HiveParserASTNode) ast.getChild(i);
switch (child.getToken().getType()) {
case HiveASTParser.StringLiteral:
newComment = HiveParserBaseSemanticAnalyzer.unescapeSQLString(child.getText());
break;
case HiveASTParser.TOK_ALTERTABLE_CHANGECOL_AFTER_POSITION:
flagCol =
HiveParserBaseSemanticAnalyzer.unescapeIdentifier(
child.getChild(0).getText());
break;
case HiveASTParser.KW_FIRST:
first = true;
break;
case HiveASTParser.TOK_CASCADE:
isCascade = true;
break;
case HiveASTParser.TOK_RESTRICT:
break;
default:
throw new ValidationException(
"Unsupported token: " + child.getToken() + " for alter table");
}
}
// Validate the operation of renaming a column name.
Table tab = getTable(new ObjectPath(qualified[0], qualified[1]));
SkewedInfo skewInfo = tab.getTTable().getSd().getSkewedInfo();
if ((null != skewInfo)
&& (null != skewInfo.getSkewedColNames())
&& skewInfo.getSkewedColNames().contains(oldColName)) {
throw new ValidationException(
oldColName + ErrorMsg.ALTER_TABLE_NOT_ALLOWED_RENAME_SKEWED_COLUMN.getMsg());
}
String tblName = HiveParserBaseSemanticAnalyzer.getDotName(qualified);
ObjectIdentifier tableIdentifier = parseObjectIdentifier(tblName);
ResolvedCatalogTable oldTable = (ResolvedCatalogTable) alteredTable;
String oldName = HiveParserBaseSemanticAnalyzer.unescapeIdentifier(oldColName);
String newName = HiveParserBaseSemanticAnalyzer.unescapeIdentifier(newColName);
if (oldTable.getPartitionKeys().contains(oldName)) {
// disallow changing partition columns
throw new ValidationException("CHANGE COLUMN cannot be applied to partition columns");
}
ResolvedSchema oldSchema = oldTable.getResolvedSchema();
Column newTableColumn =
Column.physical(
newName,
HiveTypeUtil.toFlinkType(TypeInfoUtils.getTypeInfoFromTypeString(newType)));
ResolvedSchema newSchema =
TableSchemaUtils.changeColumn(oldSchema, oldName, newTableColumn, first, flagCol);
Map<String, String> props = new HashMap<>(oldTable.getOptions());
props.put(ALTER_TABLE_OP, ALTER_COLUMNS.name());
if (isCascade) {
props.put(ALTER_COL_CASCADE, "true");
}
Column oldColumn =
oldSchema
.getColumn(oldName)
.orElseThrow(
() ->
new ValidationException(
"Can not find the old column: " + oldColName));
if (newComment != null) {
newTableColumn = newTableColumn.withComment(newComment);
} else {
newTableColumn = newTableColumn.withComment(oldColumn.getComment().orElse(null));
}
List<TableChange> tableChanges =
TableSchemaUtils.buildModifyColumnChange(
oldColumn,
newTableColumn,
first
? TableChange.ColumnPosition.first()
: (flagCol == null
? null
: TableChange.ColumnPosition.after(flagCol)));
return new AlterTableChangeOperation(
tableIdentifier,
tableChanges,
new ResolvedCatalogTable(
CatalogTable.of(
Schema.newBuilder().fromResolvedSchema(newSchema).build(),
oldTable.getComment(),
oldTable.getPartitionKeys(),
props),
newSchema),
false);
}