in paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java [659:790]
private void generateNestedColumnUpdates(
List<String> fieldNames,
org.apache.paimon.types.DataType oldType,
org.apache.paimon.types.DataType newType,
List<SchemaChange> schemaChanges) {
String joinedNames = String.join(".", fieldNames);
if (oldType.getTypeRoot() == DataTypeRoot.ROW) {
Preconditions.checkArgument(
newType.getTypeRoot() == DataTypeRoot.ROW,
"Column %s can only be updated to row type, and cannot be updated to %s type",
joinedNames,
newType.getTypeRoot());
org.apache.paimon.types.RowType oldRowType = (org.apache.paimon.types.RowType) oldType;
org.apache.paimon.types.RowType newRowType = (org.apache.paimon.types.RowType) newType;
// check that existing fields have same order
Map<String, Integer> oldFieldOrders = new HashMap<>();
for (int i = 0; i < oldRowType.getFieldCount(); i++) {
oldFieldOrders.put(oldRowType.getFields().get(i).name(), i);
}
int lastIdx = -1;
String lastFieldName = "";
for (DataField newField : newRowType.getFields()) {
String name = newField.name();
if (oldFieldOrders.containsKey(name)) {
int idx = oldFieldOrders.get(name);
Preconditions.checkState(
lastIdx < idx,
"Order of existing fields in column %s must be kept the same. "
+ "However, field %s and %s have changed their orders.",
joinedNames,
lastFieldName,
name);
lastIdx = idx;
lastFieldName = name;
}
}
// drop fields
Set<String> newFieldNames = new HashSet<>(newRowType.getFieldNames());
for (String name : oldRowType.getFieldNames()) {
if (!newFieldNames.contains(name)) {
List<String> dropColumnNames = new ArrayList<>(fieldNames);
dropColumnNames.add(name);
schemaChanges.add(
SchemaChange.dropColumn(dropColumnNames.toArray(new String[0])));
}
}
for (int i = 0; i < newRowType.getFieldCount(); i++) {
DataField field = newRowType.getFields().get(i);
String name = field.name();
List<String> fullFieldNames = new ArrayList<>(fieldNames);
fullFieldNames.add(name);
if (!oldFieldOrders.containsKey(name)) {
// add fields
SchemaChange.Move move;
if (i == 0) {
move = SchemaChange.Move.first(name);
} else {
String lastName = newRowType.getFields().get(i - 1).name();
move = SchemaChange.Move.after(name, lastName);
}
schemaChanges.add(
SchemaChange.addColumn(
fullFieldNames.toArray(new String[0]),
field.type(),
field.description(),
move));
} else {
// update existing fields
DataField oldField = oldRowType.getFields().get(oldFieldOrders.get(name));
if (!Objects.equals(oldField.description(), field.description())) {
schemaChanges.add(
SchemaChange.updateColumnComment(
fullFieldNames.toArray(new String[0]),
field.description()));
}
generateNestedColumnUpdates(
fullFieldNames, oldField.type(), field.type(), schemaChanges);
}
}
} else if (oldType.getTypeRoot() == DataTypeRoot.ARRAY) {
Preconditions.checkArgument(
newType.getTypeRoot() == DataTypeRoot.ARRAY,
"Column %s can only be updated to array type, and cannot be updated to %s type",
joinedNames,
newType);
List<String> fullFieldNames = new ArrayList<>(fieldNames);
// add a dummy column name indicating the element of array
fullFieldNames.add("element");
generateNestedColumnUpdates(
fullFieldNames,
((org.apache.paimon.types.ArrayType) oldType).getElementType(),
((org.apache.paimon.types.ArrayType) newType).getElementType(),
schemaChanges);
} else if (oldType.getTypeRoot() == DataTypeRoot.MAP) {
Preconditions.checkArgument(
newType.getTypeRoot() == DataTypeRoot.MAP,
"Column %s can only be updated to map type, and cannot be updated to %s type",
joinedNames,
newType);
org.apache.paimon.types.MapType oldMapType = (org.apache.paimon.types.MapType) oldType;
org.apache.paimon.types.MapType newMapType = (org.apache.paimon.types.MapType) newType;
Preconditions.checkArgument(
oldMapType.getKeyType().equals(newMapType.getKeyType()),
"Cannot update key type of column %s from %s type to %s type",
joinedNames,
oldMapType.getKeyType(),
newMapType.getKeyType());
List<String> fullFieldNames = new ArrayList<>(fieldNames);
// add a dummy column name indicating the value of map
fullFieldNames.add("value");
generateNestedColumnUpdates(
fullFieldNames,
oldMapType.getValueType(),
newMapType.getValueType(),
schemaChanges);
} else {
if (!oldType.equalsIgnoreNullable(newType)) {
schemaChanges.add(
SchemaChange.updateColumnType(
fieldNames.toArray(new String[0]), newType, false));
}
}
if (oldType.isNullable() != newType.isNullable()) {
schemaChanges.add(
SchemaChange.updateColumnNullability(
fieldNames.toArray(new String[0]), newType.isNullable()));
}
}