private void generateNestedColumnUpdates()

in paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java [659:790]


    private void generateNestedColumnUpdates(
            List<String> fieldNames,
            org.apache.paimon.types.DataType oldType,
            org.apache.paimon.types.DataType newType,
            List<SchemaChange> schemaChanges) {
        String joinedNames = String.join(".", fieldNames);
        if (oldType.getTypeRoot() == DataTypeRoot.ROW) {
            Preconditions.checkArgument(
                    newType.getTypeRoot() == DataTypeRoot.ROW,
                    "Column %s can only be updated to row type, and cannot be updated to %s type",
                    joinedNames,
                    newType.getTypeRoot());
            org.apache.paimon.types.RowType oldRowType = (org.apache.paimon.types.RowType) oldType;
            org.apache.paimon.types.RowType newRowType = (org.apache.paimon.types.RowType) newType;

            // check that existing fields have same order
            Map<String, Integer> oldFieldOrders = new HashMap<>();
            for (int i = 0; i < oldRowType.getFieldCount(); i++) {
                oldFieldOrders.put(oldRowType.getFields().get(i).name(), i);
            }
            int lastIdx = -1;
            String lastFieldName = "";
            for (DataField newField : newRowType.getFields()) {
                String name = newField.name();
                if (oldFieldOrders.containsKey(name)) {
                    int idx = oldFieldOrders.get(name);
                    Preconditions.checkState(
                            lastIdx < idx,
                            "Order of existing fields in column %s must be kept the same. "
                                    + "However, field %s and %s have changed their orders.",
                            joinedNames,
                            lastFieldName,
                            name);
                    lastIdx = idx;
                    lastFieldName = name;
                }
            }

            // drop fields
            Set<String> newFieldNames = new HashSet<>(newRowType.getFieldNames());
            for (String name : oldRowType.getFieldNames()) {
                if (!newFieldNames.contains(name)) {
                    List<String> dropColumnNames = new ArrayList<>(fieldNames);
                    dropColumnNames.add(name);
                    schemaChanges.add(
                            SchemaChange.dropColumn(dropColumnNames.toArray(new String[0])));
                }
            }

            for (int i = 0; i < newRowType.getFieldCount(); i++) {
                DataField field = newRowType.getFields().get(i);
                String name = field.name();
                List<String> fullFieldNames = new ArrayList<>(fieldNames);
                fullFieldNames.add(name);
                if (!oldFieldOrders.containsKey(name)) {
                    // add fields
                    SchemaChange.Move move;
                    if (i == 0) {
                        move = SchemaChange.Move.first(name);
                    } else {
                        String lastName = newRowType.getFields().get(i - 1).name();
                        move = SchemaChange.Move.after(name, lastName);
                    }
                    schemaChanges.add(
                            SchemaChange.addColumn(
                                    fullFieldNames.toArray(new String[0]),
                                    field.type(),
                                    field.description(),
                                    move));
                } else {
                    // update existing fields
                    DataField oldField = oldRowType.getFields().get(oldFieldOrders.get(name));
                    if (!Objects.equals(oldField.description(), field.description())) {
                        schemaChanges.add(
                                SchemaChange.updateColumnComment(
                                        fullFieldNames.toArray(new String[0]),
                                        field.description()));
                    }
                    generateNestedColumnUpdates(
                            fullFieldNames, oldField.type(), field.type(), schemaChanges);
                }
            }
        } else if (oldType.getTypeRoot() == DataTypeRoot.ARRAY) {
            Preconditions.checkArgument(
                    newType.getTypeRoot() == DataTypeRoot.ARRAY,
                    "Column %s can only be updated to array type, and cannot be updated to %s type",
                    joinedNames,
                    newType);
            List<String> fullFieldNames = new ArrayList<>(fieldNames);
            // add a dummy column name indicating the element of array
            fullFieldNames.add("element");
            generateNestedColumnUpdates(
                    fullFieldNames,
                    ((org.apache.paimon.types.ArrayType) oldType).getElementType(),
                    ((org.apache.paimon.types.ArrayType) newType).getElementType(),
                    schemaChanges);
        } else if (oldType.getTypeRoot() == DataTypeRoot.MAP) {
            Preconditions.checkArgument(
                    newType.getTypeRoot() == DataTypeRoot.MAP,
                    "Column %s can only be updated to map type, and cannot be updated to %s type",
                    joinedNames,
                    newType);
            org.apache.paimon.types.MapType oldMapType = (org.apache.paimon.types.MapType) oldType;
            org.apache.paimon.types.MapType newMapType = (org.apache.paimon.types.MapType) newType;
            Preconditions.checkArgument(
                    oldMapType.getKeyType().equals(newMapType.getKeyType()),
                    "Cannot update key type of column %s from %s type to %s type",
                    joinedNames,
                    oldMapType.getKeyType(),
                    newMapType.getKeyType());
            List<String> fullFieldNames = new ArrayList<>(fieldNames);
            // add a dummy column name indicating the value of map
            fullFieldNames.add("value");
            generateNestedColumnUpdates(
                    fullFieldNames,
                    oldMapType.getValueType(),
                    newMapType.getValueType(),
                    schemaChanges);
        } else {
            if (!oldType.equalsIgnoreNullable(newType)) {
                schemaChanges.add(
                        SchemaChange.updateColumnType(
                                fieldNames.toArray(new String[0]), newType, false));
            }
        }

        if (oldType.isNullable() != newType.isNullable()) {
            schemaChanges.add(
                    SchemaChange.updateColumnNullability(
                            fieldNames.toArray(new String[0]), newType.isNullable()));
        }
    }