in paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java [537:657]
private List<SchemaChange> toSchemaChange(
TableChange change, Map<String, Integer> oldTableNonPhysicalColumnIndex) {
List<SchemaChange> schemaChanges = new ArrayList<>();
if (change instanceof AddColumn) {
if (((AddColumn) change).getColumn().isPhysical()) {
AddColumn add = (AddColumn) change;
String comment = add.getColumn().getComment().orElse(null);
SchemaChange.Move move = getMove(add.getPosition(), add.getColumn().getName());
schemaChanges.add(
SchemaChange.addColumn(
add.getColumn().getName(),
LogicalTypeConversion.toDataType(
add.getColumn().getDataType().getLogicalType()),
comment,
move));
}
return schemaChanges;
} else if (change instanceof AddWatermark) {
AddWatermark add = (AddWatermark) change;
setWatermarkOptions(add.getWatermark(), schemaChanges);
return schemaChanges;
} else if (change instanceof DropColumn) {
if (!oldTableNonPhysicalColumnIndex.containsKey(
((DropColumn) change).getColumnName())) {
DropColumn drop = (DropColumn) change;
schemaChanges.add(SchemaChange.dropColumn(drop.getColumnName()));
}
return schemaChanges;
} else if (change instanceof DropWatermark) {
String watermarkPrefix = compoundKey(SCHEMA, WATERMARK, 0);
schemaChanges.add(
SchemaChange.removeOption(compoundKey(watermarkPrefix, WATERMARK_ROWTIME)));
schemaChanges.add(
SchemaChange.removeOption(
compoundKey(watermarkPrefix, WATERMARK_STRATEGY_EXPR)));
schemaChanges.add(
SchemaChange.removeOption(
compoundKey(watermarkPrefix, WATERMARK_STRATEGY_DATA_TYPE)));
return schemaChanges;
} else if (change instanceof ModifyColumnName) {
if (!oldTableNonPhysicalColumnIndex.containsKey(
((ModifyColumnName) change).getOldColumnName())) {
ModifyColumnName modify = (ModifyColumnName) change;
schemaChanges.add(
SchemaChange.renameColumn(
modify.getOldColumnName(), modify.getNewColumnName()));
}
return schemaChanges;
} else if (change instanceof ModifyPhysicalColumnType) {
if (!oldTableNonPhysicalColumnIndex.containsKey(
((ModifyPhysicalColumnType) change).getOldColumn().getName())) {
ModifyPhysicalColumnType modify = (ModifyPhysicalColumnType) change;
generateNestedColumnUpdates(
Collections.singletonList(modify.getOldColumn().getName()),
LogicalTypeConversion.toDataType(
modify.getOldColumn().getDataType().getLogicalType()),
LogicalTypeConversion.toDataType(modify.getNewType().getLogicalType()),
schemaChanges);
}
return schemaChanges;
} else if (change instanceof ModifyColumnPosition) {
if (!oldTableNonPhysicalColumnIndex.containsKey(
((ModifyColumnPosition) change).getOldColumn().getName())) {
ModifyColumnPosition modify = (ModifyColumnPosition) change;
SchemaChange.Move move =
getMove(modify.getNewPosition(), modify.getNewColumn().getName());
schemaChanges.add(SchemaChange.updateColumnPosition(move));
}
return schemaChanges;
} else if (change instanceof TableChange.ModifyColumnComment) {
if (!oldTableNonPhysicalColumnIndex.containsKey(
((ModifyColumnComment) change).getOldColumn().getName())) {
ModifyColumnComment modify = (ModifyColumnComment) change;
schemaChanges.add(
SchemaChange.updateColumnComment(
modify.getNewColumn().getName(), modify.getNewComment()));
}
return schemaChanges;
} else if (change instanceof ModifyWatermark) {
ModifyWatermark modify = (ModifyWatermark) change;
setWatermarkOptions(modify.getNewWatermark(), schemaChanges);
return schemaChanges;
} else if (change instanceof SetOption) {
SetOption setOption = (SetOption) change;
String key = setOption.getKey();
String value = setOption.getValue();
SchemaManager.checkAlterTablePath(key);
if (COMMENT_PROP.equals(key)) {
schemaChanges.add(SchemaChange.updateComment(value));
} else {
schemaChanges.add(SchemaChange.setOption(key, value));
}
return schemaChanges;
} else if (change instanceof ResetOption) {
ResetOption resetOption = (ResetOption) change;
String key = resetOption.getKey();
if (COMMENT_PROP.equals(key)) {
schemaChanges.add(SchemaChange.updateComment(null));
} else {
schemaChanges.add(SchemaChange.removeOption(resetOption.getKey()));
}
return schemaChanges;
} else if (change instanceof TableChange.ModifyColumn) {
// let non-physical column handle by option
if (oldTableNonPhysicalColumnIndex.containsKey(
((TableChange.ModifyColumn) change).getOldColumn().getName())
&& !(((TableChange.ModifyColumn) change).getNewColumn()
instanceof Column.PhysicalColumn)) {
return schemaChanges;
} else {
throw new UnsupportedOperationException(
"Change is not supported: " + change.getClass());
}
} else if (change instanceof MaterializedTableChange
&& handleMaterializedTableChange(change, schemaChanges)) {
return schemaChanges;
}
throw new UnsupportedOperationException("Change is not supported: " + change.getClass());
}