private List toSchemaChange()

in paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java [284:370]


    private List<SchemaChange> toSchemaChange(TableChange change) {
        List<SchemaChange> schemaChanges = new ArrayList<>();
        if (change instanceof AddColumn) {
            AddColumn add = (AddColumn) change;
            String comment = add.getColumn().getComment().orElse(null);
            SchemaChange.Move move = getMove(add.getPosition(), add.getColumn().getName());
            schemaChanges.add(
                    SchemaChange.addColumn(
                            add.getColumn().getName(),
                            LogicalTypeConversion.toDataType(
                                    add.getColumn().getDataType().getLogicalType()),
                            comment,
                            move));
            return schemaChanges;
        } else if (change instanceof AddWatermark) {
            AddWatermark add = (AddWatermark) change;
            setWatermarkOptions(add.getWatermark(), schemaChanges);
            return schemaChanges;
        } else if (change instanceof DropColumn) {
            DropColumn drop = (DropColumn) change;
            schemaChanges.add(SchemaChange.dropColumn(drop.getColumnName()));
            return schemaChanges;
        } else if (change instanceof DropWatermark) {
            String watermarkPrefix = compoundKey(SCHEMA, WATERMARK, 0);
            schemaChanges.add(
                    SchemaChange.removeOption(compoundKey(watermarkPrefix, WATERMARK_ROWTIME)));
            schemaChanges.add(
                    SchemaChange.removeOption(
                            compoundKey(watermarkPrefix, WATERMARK_STRATEGY_EXPR)));
            schemaChanges.add(
                    SchemaChange.removeOption(
                            compoundKey(watermarkPrefix, WATERMARK_STRATEGY_DATA_TYPE)));
            return schemaChanges;
        } else if (change instanceof ModifyColumnName) {
            ModifyColumnName modify = (ModifyColumnName) change;
            schemaChanges.add(
                    SchemaChange.renameColumn(
                            modify.getOldColumnName(), modify.getNewColumnName()));
            return schemaChanges;
        } else if (change instanceof ModifyPhysicalColumnType) {
            ModifyPhysicalColumnType modify = (ModifyPhysicalColumnType) change;
            LogicalType newColumnType = modify.getNewType().getLogicalType();
            LogicalType oldColumnType = modify.getOldColumn().getDataType().getLogicalType();
            if (newColumnType.isNullable() != oldColumnType.isNullable()) {
                schemaChanges.add(
                        SchemaChange.updateColumnNullability(
                                modify.getNewColumn().getName(), newColumnType.isNullable()));
            }
            schemaChanges.add(
                    SchemaChange.updateColumnType(
                            modify.getOldColumn().getName(),
                            LogicalTypeConversion.toDataType(newColumnType)));
            return schemaChanges;
        } else if (change instanceof ModifyColumnPosition) {
            ModifyColumnPosition modify = (ModifyColumnPosition) change;
            SchemaChange.Move move =
                    getMove(modify.getNewPosition(), modify.getNewColumn().getName());
            schemaChanges.add(SchemaChange.updateColumnPosition(move));
            return schemaChanges;
        } else if (change instanceof TableChange.ModifyColumnComment) {
            ModifyColumnComment modify = (ModifyColumnComment) change;
            schemaChanges.add(
                    SchemaChange.updateColumnComment(
                            modify.getNewColumn().getName(), modify.getNewComment()));
            return schemaChanges;
        } else if (change instanceof ModifyWatermark) {
            ModifyWatermark modify = (ModifyWatermark) change;
            setWatermarkOptions(modify.getNewWatermark(), schemaChanges);
            return schemaChanges;
        } else if (change instanceof SetOption) {
            SetOption setOption = (SetOption) change;
            String key = setOption.getKey();
            String value = setOption.getValue();

            SchemaManager.checkAlterTablePath(key);

            schemaChanges.add(SchemaChange.setOption(key, value));
            return schemaChanges;
        } else if (change instanceof ResetOption) {
            ResetOption resetOption = (ResetOption) change;
            schemaChanges.add(SchemaChange.removeOption(resetOption.getKey()));
            return schemaChanges;
        } else {
            throw new UnsupportedOperationException(
                    "Change is not supported: " + change.getClass());
        }
    }