public TableSchema generateTableSchema()

in paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java [298:483]


    public TableSchema generateTableSchema(
            TableSchema oldTableSchema, List<SchemaChange> changes, LazyField<Boolean> hasSnapshots)
            throws Catalog.ColumnAlreadyExistException, Catalog.ColumnNotExistException {
        Map<String, String> oldOptions = new HashMap<>(oldTableSchema.options());
        Map<String, String> newOptions = new HashMap<>(oldTableSchema.options());
        List<DataField> newFields = new ArrayList<>(oldTableSchema.fields());
        AtomicInteger highestFieldId = new AtomicInteger(oldTableSchema.highestFieldId());
        String newComment = oldTableSchema.comment();
        for (SchemaChange change : changes) {
            if (change instanceof SetOption) {
                SetOption setOption = (SetOption) change;
                if (hasSnapshots.get()) {
                    checkAlterTableOption(
                            setOption.key(),
                            oldOptions.get(setOption.key()),
                            setOption.value(),
                            false);
                }
                newOptions.put(setOption.key(), setOption.value());
            } else if (change instanceof RemoveOption) {
                RemoveOption removeOption = (RemoveOption) change;
                if (hasSnapshots.get()) {
                    checkResetTableOption(removeOption.key());
                }
                newOptions.remove(removeOption.key());
            } else if (change instanceof UpdateComment) {
                UpdateComment updateComment = (UpdateComment) change;
                newComment = updateComment.comment();
            } else if (change instanceof AddColumn) {
                AddColumn addColumn = (AddColumn) change;
                SchemaChange.Move move = addColumn.move();
                Preconditions.checkArgument(
                        addColumn.dataType().isNullable(),
                        "Column %s cannot specify NOT NULL in the %s table.",
                        String.join(".", addColumn.fieldNames()),
                        identifierFromPath(tableRoot.toString(), true, branch).getFullName());
                int id = highestFieldId.incrementAndGet();
                DataType dataType = ReassignFieldId.reassign(addColumn.dataType(), highestFieldId);

                new NestedColumnModifier(addColumn.fieldNames()) {
                    @Override
                    protected void updateLastColumn(List<DataField> newFields, String fieldName)
                            throws Catalog.ColumnAlreadyExistException {
                        assertColumnNotExists(newFields, fieldName);

                        DataField dataField =
                                new DataField(id, fieldName, dataType, addColumn.description());

                        // key: name ; value : index
                        Map<String, Integer> map = new HashMap<>();
                        for (int i = 0; i < newFields.size(); i++) {
                            map.put(newFields.get(i).name(), i);
                        }

                        if (null != move) {
                            if (move.type().equals(SchemaChange.Move.MoveType.FIRST)) {
                                newFields.add(0, dataField);
                            } else if (move.type().equals(SchemaChange.Move.MoveType.AFTER)) {
                                int fieldIndex = map.get(move.referenceFieldName());
                                newFields.add(fieldIndex + 1, dataField);
                            }
                        } else {
                            newFields.add(dataField);
                        }
                    }
                }.updateIntermediateColumn(newFields, 0);
            } else if (change instanceof RenameColumn) {
                RenameColumn rename = (RenameColumn) change;
                assertNotUpdatingPrimaryKeys(oldTableSchema, rename.fieldNames(), "rename");
                new NestedColumnModifier(rename.fieldNames()) {
                    @Override
                    protected void updateLastColumn(List<DataField> newFields, String fieldName)
                            throws Catalog.ColumnNotExistException,
                                    Catalog.ColumnAlreadyExistException {
                        assertColumnExists(newFields, fieldName);
                        assertColumnNotExists(newFields, rename.newName());
                        for (int i = 0; i < newFields.size(); i++) {
                            DataField field = newFields.get(i);
                            if (!field.name().equals(fieldName)) {
                                continue;
                            }

                            DataField newField =
                                    new DataField(
                                            field.id(),
                                            rename.newName(),
                                            field.type(),
                                            field.description());
                            newFields.set(i, newField);
                            return;
                        }
                    }
                }.updateIntermediateColumn(newFields, 0);
            } else if (change instanceof DropColumn) {
                DropColumn drop = (DropColumn) change;
                dropColumnValidation(oldTableSchema, drop);
                new NestedColumnModifier(drop.fieldNames()) {
                    @Override
                    protected void updateLastColumn(List<DataField> newFields, String fieldName)
                            throws Catalog.ColumnNotExistException {
                        assertColumnExists(newFields, fieldName);
                        newFields.removeIf(f -> f.name().equals(fieldName));
                        if (newFields.isEmpty()) {
                            throw new IllegalArgumentException("Cannot drop all fields in table");
                        }
                    }
                }.updateIntermediateColumn(newFields, 0);
            } else if (change instanceof UpdateColumnType) {
                UpdateColumnType update = (UpdateColumnType) change;
                assertNotUpdatingPrimaryKeys(oldTableSchema, update.fieldNames(), "update");
                updateNestedColumn(
                        newFields,
                        update.fieldNames(),
                        (field) -> {
                            DataType targetType = update.newDataType();
                            if (update.keepNullability()) {
                                targetType = targetType.copy(field.type().isNullable());
                            }
                            checkState(
                                    DataTypeCasts.supportsExplicitCast(field.type(), targetType)
                                            && CastExecutors.resolve(field.type(), targetType)
                                                    != null,
                                    String.format(
                                            "Column type %s[%s] cannot be converted to %s without loosing information.",
                                            field.name(), field.type(), targetType));
                            return new DataField(
                                    field.id(), field.name(), targetType, field.description());
                        });
            } else if (change instanceof UpdateColumnNullability) {
                UpdateColumnNullability update = (UpdateColumnNullability) change;
                if (update.fieldNames().length == 1
                        && update.newNullability()
                        && oldTableSchema.primaryKeys().contains(update.fieldNames()[0])) {
                    throw new UnsupportedOperationException(
                            "Cannot change nullability of primary key");
                }
                updateNestedColumn(
                        newFields,
                        update.fieldNames(),
                        (field) ->
                                new DataField(
                                        field.id(),
                                        field.name(),
                                        field.type().copy(update.newNullability()),
                                        field.description()));
            } else if (change instanceof UpdateColumnComment) {
                UpdateColumnComment update = (UpdateColumnComment) change;
                updateNestedColumn(
                        newFields,
                        update.fieldNames(),
                        (field) ->
                                new DataField(
                                        field.id(),
                                        field.name(),
                                        field.type(),
                                        update.newDescription()));
            } else if (change instanceof UpdateColumnPosition) {
                UpdateColumnPosition update = (UpdateColumnPosition) change;
                SchemaChange.Move move = update.move();
                applyMove(newFields, move);
            } else {
                throw new UnsupportedOperationException("Unsupported change: " + change.getClass());
            }
        }

        // We change TableSchema to Schema, because we want to deal with primary-key and
        // partition in options.
        Schema newSchema =
                new Schema(
                        newFields,
                        oldTableSchema.partitionKeys(),
                        applyNotNestedColumnRename(
                                oldTableSchema.primaryKeys(),
                                Iterables.filter(changes, RenameColumn.class)),
                        applySchemaChanges(newOptions, changes),
                        newComment);

        return new TableSchema(
                oldTableSchema.id() + 1,
                newSchema.fields(),
                highestFieldId.get(),
                newSchema.partitionKeys(),
                newSchema.primaryKeys(),
                newSchema.options(),
                newSchema.comment());
    }