in paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java [298:483]
public TableSchema generateTableSchema(
TableSchema oldTableSchema, List<SchemaChange> changes, LazyField<Boolean> hasSnapshots)
throws Catalog.ColumnAlreadyExistException, Catalog.ColumnNotExistException {
Map<String, String> oldOptions = new HashMap<>(oldTableSchema.options());
Map<String, String> newOptions = new HashMap<>(oldTableSchema.options());
List<DataField> newFields = new ArrayList<>(oldTableSchema.fields());
AtomicInteger highestFieldId = new AtomicInteger(oldTableSchema.highestFieldId());
String newComment = oldTableSchema.comment();
for (SchemaChange change : changes) {
if (change instanceof SetOption) {
SetOption setOption = (SetOption) change;
if (hasSnapshots.get()) {
checkAlterTableOption(
setOption.key(),
oldOptions.get(setOption.key()),
setOption.value(),
false);
}
newOptions.put(setOption.key(), setOption.value());
} else if (change instanceof RemoveOption) {
RemoveOption removeOption = (RemoveOption) change;
if (hasSnapshots.get()) {
checkResetTableOption(removeOption.key());
}
newOptions.remove(removeOption.key());
} else if (change instanceof UpdateComment) {
UpdateComment updateComment = (UpdateComment) change;
newComment = updateComment.comment();
} else if (change instanceof AddColumn) {
AddColumn addColumn = (AddColumn) change;
SchemaChange.Move move = addColumn.move();
Preconditions.checkArgument(
addColumn.dataType().isNullable(),
"Column %s cannot specify NOT NULL in the %s table.",
String.join(".", addColumn.fieldNames()),
identifierFromPath(tableRoot.toString(), true, branch).getFullName());
int id = highestFieldId.incrementAndGet();
DataType dataType = ReassignFieldId.reassign(addColumn.dataType(), highestFieldId);
new NestedColumnModifier(addColumn.fieldNames()) {
@Override
protected void updateLastColumn(List<DataField> newFields, String fieldName)
throws Catalog.ColumnAlreadyExistException {
assertColumnNotExists(newFields, fieldName);
DataField dataField =
new DataField(id, fieldName, dataType, addColumn.description());
// key: name ; value : index
Map<String, Integer> map = new HashMap<>();
for (int i = 0; i < newFields.size(); i++) {
map.put(newFields.get(i).name(), i);
}
if (null != move) {
if (move.type().equals(SchemaChange.Move.MoveType.FIRST)) {
newFields.add(0, dataField);
} else if (move.type().equals(SchemaChange.Move.MoveType.AFTER)) {
int fieldIndex = map.get(move.referenceFieldName());
newFields.add(fieldIndex + 1, dataField);
}
} else {
newFields.add(dataField);
}
}
}.updateIntermediateColumn(newFields, 0);
} else if (change instanceof RenameColumn) {
RenameColumn rename = (RenameColumn) change;
assertNotUpdatingPrimaryKeys(oldTableSchema, rename.fieldNames(), "rename");
new NestedColumnModifier(rename.fieldNames()) {
@Override
protected void updateLastColumn(List<DataField> newFields, String fieldName)
throws Catalog.ColumnNotExistException,
Catalog.ColumnAlreadyExistException {
assertColumnExists(newFields, fieldName);
assertColumnNotExists(newFields, rename.newName());
for (int i = 0; i < newFields.size(); i++) {
DataField field = newFields.get(i);
if (!field.name().equals(fieldName)) {
continue;
}
DataField newField =
new DataField(
field.id(),
rename.newName(),
field.type(),
field.description());
newFields.set(i, newField);
return;
}
}
}.updateIntermediateColumn(newFields, 0);
} else if (change instanceof DropColumn) {
DropColumn drop = (DropColumn) change;
dropColumnValidation(oldTableSchema, drop);
new NestedColumnModifier(drop.fieldNames()) {
@Override
protected void updateLastColumn(List<DataField> newFields, String fieldName)
throws Catalog.ColumnNotExistException {
assertColumnExists(newFields, fieldName);
newFields.removeIf(f -> f.name().equals(fieldName));
if (newFields.isEmpty()) {
throw new IllegalArgumentException("Cannot drop all fields in table");
}
}
}.updateIntermediateColumn(newFields, 0);
} else if (change instanceof UpdateColumnType) {
UpdateColumnType update = (UpdateColumnType) change;
assertNotUpdatingPrimaryKeys(oldTableSchema, update.fieldNames(), "update");
updateNestedColumn(
newFields,
update.fieldNames(),
(field) -> {
DataType targetType = update.newDataType();
if (update.keepNullability()) {
targetType = targetType.copy(field.type().isNullable());
}
checkState(
DataTypeCasts.supportsExplicitCast(field.type(), targetType)
&& CastExecutors.resolve(field.type(), targetType)
!= null,
String.format(
"Column type %s[%s] cannot be converted to %s without loosing information.",
field.name(), field.type(), targetType));
return new DataField(
field.id(), field.name(), targetType, field.description());
});
} else if (change instanceof UpdateColumnNullability) {
UpdateColumnNullability update = (UpdateColumnNullability) change;
if (update.fieldNames().length == 1
&& update.newNullability()
&& oldTableSchema.primaryKeys().contains(update.fieldNames()[0])) {
throw new UnsupportedOperationException(
"Cannot change nullability of primary key");
}
updateNestedColumn(
newFields,
update.fieldNames(),
(field) ->
new DataField(
field.id(),
field.name(),
field.type().copy(update.newNullability()),
field.description()));
} else if (change instanceof UpdateColumnComment) {
UpdateColumnComment update = (UpdateColumnComment) change;
updateNestedColumn(
newFields,
update.fieldNames(),
(field) ->
new DataField(
field.id(),
field.name(),
field.type(),
update.newDescription()));
} else if (change instanceof UpdateColumnPosition) {
UpdateColumnPosition update = (UpdateColumnPosition) change;
SchemaChange.Move move = update.move();
applyMove(newFields, move);
} else {
throw new UnsupportedOperationException("Unsupported change: " + change.getClass());
}
}
// We change TableSchema to Schema, because we want to deal with primary-key and
// partition in options.
Schema newSchema =
new Schema(
newFields,
oldTableSchema.partitionKeys(),
applyNotNestedColumnRename(
oldTableSchema.primaryKeys(),
Iterables.filter(changes, RenameColumn.class)),
applySchemaChanges(newOptions, changes),
newComment);
return new TableSchema(
oldTableSchema.id() + 1,
newSchema.fields(),
highestFieldId.get(),
newSchema.partitionKeys(),
newSchema.primaryKeys(),
newSchema.options(),
newSchema.comment());
}