in paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java [190:389]
public TableSchema commitChanges(List<SchemaChange> changes)
throws Catalog.TableNotExistException, Catalog.ColumnAlreadyExistException,
Catalog.ColumnNotExistException {
while (true) {
TableSchema schema =
latest().orElseThrow(
() ->
new Catalog.TableNotExistException(
fromPath(tableRoot.getPath(), true)));
Map<String, String> newOptions = new HashMap<>(schema.options());
List<DataField> newFields = new ArrayList<>(schema.fields());
AtomicInteger highestFieldId = new AtomicInteger(schema.highestFieldId());
for (SchemaChange change : changes) {
if (change instanceof SetOption) {
SetOption setOption = (SetOption) change;
checkAlterTableOption(setOption.key());
newOptions.put(setOption.key(), setOption.value());
} else if (change instanceof RemoveOption) {
RemoveOption removeOption = (RemoveOption) change;
checkAlterTableOption(removeOption.key());
newOptions.remove(removeOption.key());
} else if (change instanceof AddColumn) {
AddColumn addColumn = (AddColumn) change;
SchemaChange.Move move = addColumn.move();
if (newFields.stream().anyMatch(f -> f.name().equals(addColumn.fieldName()))) {
throw new Catalog.ColumnAlreadyExistException(
fromPath(tableRoot.getPath(), true), addColumn.fieldName());
}
Preconditions.checkArgument(
addColumn.dataType().isNullable(),
"ADD COLUMN cannot specify NOT NULL.");
int id = highestFieldId.incrementAndGet();
DataType dataType =
ReassignFieldId.reassign(addColumn.dataType(), highestFieldId);
DataField dataField =
new DataField(
id, addColumn.fieldName(), dataType, addColumn.description());
// key: name ; value : index
Map<String, Integer> map = new HashMap<>();
for (int i = 0; i < newFields.size(); i++) {
map.put(newFields.get(i).name(), i);
}
if (null != move) {
if (move.type().equals(SchemaChange.Move.MoveType.FIRST)) {
newFields.add(0, dataField);
} else if (move.type().equals(SchemaChange.Move.MoveType.AFTER)) {
int fieldIndex = map.get(move.referenceFieldName());
newFields.add(fieldIndex + 1, dataField);
}
} else {
newFields.add(dataField);
}
} else if (change instanceof RenameColumn) {
RenameColumn rename = (RenameColumn) change;
validateNotPrimaryAndPartitionKey(schema, rename.fieldName());
if (newFields.stream().anyMatch(f -> f.name().equals(rename.newName()))) {
throw new Catalog.ColumnAlreadyExistException(
fromPath(tableRoot.getPath(), true), rename.fieldName());
}
updateNestedColumn(
newFields,
new String[] {rename.fieldName()},
0,
(field) ->
new DataField(
field.id(),
rename.newName(),
field.type(),
field.description()));
} else if (change instanceof DropColumn) {
DropColumn drop = (DropColumn) change;
validateNotPrimaryAndPartitionKey(schema, drop.fieldName());
if (!newFields.removeIf(
f -> f.name().equals(((DropColumn) change).fieldName()))) {
throw new Catalog.ColumnNotExistException(
fromPath(tableRoot.getPath(), true), drop.fieldName());
}
if (newFields.isEmpty()) {
throw new IllegalArgumentException("Cannot drop all fields in table");
}
} else if (change instanceof UpdateColumnType) {
UpdateColumnType update = (UpdateColumnType) change;
if (schema.partitionKeys().contains(update.fieldName())) {
throw new IllegalArgumentException(
String.format(
"Cannot update partition column [%s] type in the table[%s].",
update.fieldName(), tableRoot.getName()));
}
updateColumn(
newFields,
update.fieldName(),
(field) -> {
checkState(
DataTypeCasts.supportsExplicitCast(
field.type(), update.newDataType())
&& CastExecutors.resolve(
field.type(), update.newDataType())
!= null,
String.format(
"Column type %s[%s] cannot be converted to %s without loosing information.",
field.name(), field.type(), update.newDataType()));
AtomicInteger dummyId = new AtomicInteger(0);
if (dummyId.get() != 0) {
throw new RuntimeException(
String.format(
"Update column to nested row type '%s' is not supported.",
update.newDataType()));
}
return new DataField(
field.id(),
field.name(),
update.newDataType(),
field.description());
});
} else if (change instanceof UpdateColumnNullability) {
UpdateColumnNullability update = (UpdateColumnNullability) change;
if (update.fieldNames().length == 1
&& update.newNullability()
&& schema.primaryKeys().contains(update.fieldNames()[0])) {
throw new UnsupportedOperationException(
"Cannot change nullability of primary key");
}
updateNestedColumn(
newFields,
update.fieldNames(),
0,
(field) ->
new DataField(
field.id(),
field.name(),
field.type().copy(update.newNullability()),
field.description()));
} else if (change instanceof UpdateColumnComment) {
UpdateColumnComment update = (UpdateColumnComment) change;
updateNestedColumn(
newFields,
update.fieldNames(),
0,
(field) ->
new DataField(
field.id(),
field.name(),
field.type(),
update.newDescription()));
} else if (change instanceof UpdateColumnPosition) {
UpdateColumnPosition update = (UpdateColumnPosition) change;
SchemaChange.Move move = update.move();
// key: name ; value : index
Map<String, Integer> map = new HashMap<>();
for (int i = 0; i < newFields.size(); i++) {
map.put(newFields.get(i).name(), i);
}
int fieldIndex = map.get(move.fieldName());
int refIndex = 0;
if (move.type().equals(SchemaChange.Move.MoveType.FIRST)) {
checkMoveIndexEqual(move, fieldIndex, refIndex);
newFields.add(refIndex, newFields.remove(fieldIndex));
} else if (move.type().equals(SchemaChange.Move.MoveType.AFTER)) {
refIndex = map.get(move.referenceFieldName());
checkMoveIndexEqual(move, fieldIndex, refIndex);
if (fieldIndex > refIndex) {
newFields.add(refIndex + 1, newFields.remove(fieldIndex));
} else {
newFields.add(refIndex, newFields.remove(fieldIndex));
}
}
} else {
throw new UnsupportedOperationException(
"Unsupported change: " + change.getClass());
}
}
TableSchema newSchema =
new TableSchema(
schema.id() + 1,
newFields,
highestFieldId.get(),
schema.partitionKeys(),
schema.primaryKeys(),
newOptions,
schema.comment());
try {
boolean success = commit(newSchema);
if (success) {
return newSchema;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}