private List toSchemaChange()

in paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java [537:657]


    private List<SchemaChange> toSchemaChange(
            TableChange change, Map<String, Integer> oldTableNonPhysicalColumnIndex) {
        List<SchemaChange> schemaChanges = new ArrayList<>();
        if (change instanceof AddColumn) {
            if (((AddColumn) change).getColumn().isPhysical()) {
                AddColumn add = (AddColumn) change;
                String comment = add.getColumn().getComment().orElse(null);
                SchemaChange.Move move = getMove(add.getPosition(), add.getColumn().getName());
                schemaChanges.add(
                        SchemaChange.addColumn(
                                add.getColumn().getName(),
                                LogicalTypeConversion.toDataType(
                                        add.getColumn().getDataType().getLogicalType()),
                                comment,
                                move));
            }
            return schemaChanges;
        } else if (change instanceof AddWatermark) {
            AddWatermark add = (AddWatermark) change;
            setWatermarkOptions(add.getWatermark(), schemaChanges);
            return schemaChanges;
        } else if (change instanceof DropColumn) {
            if (!oldTableNonPhysicalColumnIndex.containsKey(
                    ((DropColumn) change).getColumnName())) {
                DropColumn drop = (DropColumn) change;
                schemaChanges.add(SchemaChange.dropColumn(drop.getColumnName()));
            }
            return schemaChanges;
        } else if (change instanceof DropWatermark) {
            String watermarkPrefix = compoundKey(SCHEMA, WATERMARK, 0);
            schemaChanges.add(
                    SchemaChange.removeOption(compoundKey(watermarkPrefix, WATERMARK_ROWTIME)));
            schemaChanges.add(
                    SchemaChange.removeOption(
                            compoundKey(watermarkPrefix, WATERMARK_STRATEGY_EXPR)));
            schemaChanges.add(
                    SchemaChange.removeOption(
                            compoundKey(watermarkPrefix, WATERMARK_STRATEGY_DATA_TYPE)));
            return schemaChanges;
        } else if (change instanceof ModifyColumnName) {
            if (!oldTableNonPhysicalColumnIndex.containsKey(
                    ((ModifyColumnName) change).getOldColumnName())) {
                ModifyColumnName modify = (ModifyColumnName) change;
                schemaChanges.add(
                        SchemaChange.renameColumn(
                                modify.getOldColumnName(), modify.getNewColumnName()));
            }
            return schemaChanges;
        } else if (change instanceof ModifyPhysicalColumnType) {
            if (!oldTableNonPhysicalColumnIndex.containsKey(
                    ((ModifyPhysicalColumnType) change).getOldColumn().getName())) {
                ModifyPhysicalColumnType modify = (ModifyPhysicalColumnType) change;
                generateNestedColumnUpdates(
                        Collections.singletonList(modify.getOldColumn().getName()),
                        LogicalTypeConversion.toDataType(
                                modify.getOldColumn().getDataType().getLogicalType()),
                        LogicalTypeConversion.toDataType(modify.getNewType().getLogicalType()),
                        schemaChanges);
            }
            return schemaChanges;
        } else if (change instanceof ModifyColumnPosition) {
            if (!oldTableNonPhysicalColumnIndex.containsKey(
                    ((ModifyColumnPosition) change).getOldColumn().getName())) {
                ModifyColumnPosition modify = (ModifyColumnPosition) change;
                SchemaChange.Move move =
                        getMove(modify.getNewPosition(), modify.getNewColumn().getName());
                schemaChanges.add(SchemaChange.updateColumnPosition(move));
            }
            return schemaChanges;
        } else if (change instanceof TableChange.ModifyColumnComment) {
            if (!oldTableNonPhysicalColumnIndex.containsKey(
                    ((ModifyColumnComment) change).getOldColumn().getName())) {
                ModifyColumnComment modify = (ModifyColumnComment) change;
                schemaChanges.add(
                        SchemaChange.updateColumnComment(
                                modify.getNewColumn().getName(), modify.getNewComment()));
            }
            return schemaChanges;
        } else if (change instanceof ModifyWatermark) {
            ModifyWatermark modify = (ModifyWatermark) change;
            setWatermarkOptions(modify.getNewWatermark(), schemaChanges);
            return schemaChanges;
        } else if (change instanceof SetOption) {
            SetOption setOption = (SetOption) change;
            String key = setOption.getKey();
            String value = setOption.getValue();

            SchemaManager.checkAlterTablePath(key);

            if (COMMENT_PROP.equals(key)) {
                schemaChanges.add(SchemaChange.updateComment(value));
            } else {
                schemaChanges.add(SchemaChange.setOption(key, value));
            }
            return schemaChanges;
        } else if (change instanceof ResetOption) {
            ResetOption resetOption = (ResetOption) change;
            String key = resetOption.getKey();
            if (COMMENT_PROP.equals(key)) {
                schemaChanges.add(SchemaChange.updateComment(null));
            } else {
                schemaChanges.add(SchemaChange.removeOption(resetOption.getKey()));
            }
            return schemaChanges;
        } else if (change instanceof TableChange.ModifyColumn) {
            // let non-physical column handle by option
            if (oldTableNonPhysicalColumnIndex.containsKey(
                            ((TableChange.ModifyColumn) change).getOldColumn().getName())
                    && !(((TableChange.ModifyColumn) change).getNewColumn()
                            instanceof Column.PhysicalColumn)) {
                return schemaChanges;
            } else {
                throw new UnsupportedOperationException(
                        "Change is not supported: " + change.getClass());
            }
        } else if (change instanceof MaterializedTableChange
                && handleMaterializedTableChange(change, schemaChanges)) {
            return schemaChanges;
        }
        throw new UnsupportedOperationException("Change is not supported: " + change.getClass());
    }