in paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java [284:370]
private List<SchemaChange> toSchemaChange(TableChange change) {
List<SchemaChange> schemaChanges = new ArrayList<>();
if (change instanceof AddColumn) {
AddColumn add = (AddColumn) change;
String comment = add.getColumn().getComment().orElse(null);
SchemaChange.Move move = getMove(add.getPosition(), add.getColumn().getName());
schemaChanges.add(
SchemaChange.addColumn(
add.getColumn().getName(),
LogicalTypeConversion.toDataType(
add.getColumn().getDataType().getLogicalType()),
comment,
move));
return schemaChanges;
} else if (change instanceof AddWatermark) {
AddWatermark add = (AddWatermark) change;
setWatermarkOptions(add.getWatermark(), schemaChanges);
return schemaChanges;
} else if (change instanceof DropColumn) {
DropColumn drop = (DropColumn) change;
schemaChanges.add(SchemaChange.dropColumn(drop.getColumnName()));
return schemaChanges;
} else if (change instanceof DropWatermark) {
String watermarkPrefix = compoundKey(SCHEMA, WATERMARK, 0);
schemaChanges.add(
SchemaChange.removeOption(compoundKey(watermarkPrefix, WATERMARK_ROWTIME)));
schemaChanges.add(
SchemaChange.removeOption(
compoundKey(watermarkPrefix, WATERMARK_STRATEGY_EXPR)));
schemaChanges.add(
SchemaChange.removeOption(
compoundKey(watermarkPrefix, WATERMARK_STRATEGY_DATA_TYPE)));
return schemaChanges;
} else if (change instanceof ModifyColumnName) {
ModifyColumnName modify = (ModifyColumnName) change;
schemaChanges.add(
SchemaChange.renameColumn(
modify.getOldColumnName(), modify.getNewColumnName()));
return schemaChanges;
} else if (change instanceof ModifyPhysicalColumnType) {
ModifyPhysicalColumnType modify = (ModifyPhysicalColumnType) change;
LogicalType newColumnType = modify.getNewType().getLogicalType();
LogicalType oldColumnType = modify.getOldColumn().getDataType().getLogicalType();
if (newColumnType.isNullable() != oldColumnType.isNullable()) {
schemaChanges.add(
SchemaChange.updateColumnNullability(
modify.getNewColumn().getName(), newColumnType.isNullable()));
}
schemaChanges.add(
SchemaChange.updateColumnType(
modify.getOldColumn().getName(),
LogicalTypeConversion.toDataType(newColumnType)));
return schemaChanges;
} else if (change instanceof ModifyColumnPosition) {
ModifyColumnPosition modify = (ModifyColumnPosition) change;
SchemaChange.Move move =
getMove(modify.getNewPosition(), modify.getNewColumn().getName());
schemaChanges.add(SchemaChange.updateColumnPosition(move));
return schemaChanges;
} else if (change instanceof TableChange.ModifyColumnComment) {
ModifyColumnComment modify = (ModifyColumnComment) change;
schemaChanges.add(
SchemaChange.updateColumnComment(
modify.getNewColumn().getName(), modify.getNewComment()));
return schemaChanges;
} else if (change instanceof ModifyWatermark) {
ModifyWatermark modify = (ModifyWatermark) change;
setWatermarkOptions(modify.getNewWatermark(), schemaChanges);
return schemaChanges;
} else if (change instanceof SetOption) {
SetOption setOption = (SetOption) change;
String key = setOption.getKey();
String value = setOption.getValue();
SchemaManager.checkAlterTablePath(key);
schemaChanges.add(SchemaChange.setOption(key, value));
return schemaChanges;
} else if (change instanceof ResetOption) {
ResetOption resetOption = (ResetOption) change;
schemaChanges.add(SchemaChange.removeOption(resetOption.getKey()));
return schemaChanges;
} else {
throw new UnsupportedOperationException(
"Change is not supported: " + change.getClass());
}
}