paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java [120:242]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        if (schemaChange instanceof SchemaChange.AddColumn) {
            try {
                catalog.alterTable(identifier, schemaChange, false);
            } catch (Catalog.ColumnAlreadyExistException e) {
                // This is normal. For example when a table is split into multiple database tables,
                // all these tables will be added the same column. However, schemaManager can't
                // handle duplicated column adds, so we just catch the exception and log it.
                if (LOG.isDebugEnabled()) {
                    LOG.debug(
                            "Failed to perform SchemaChange.AddColumn {}, "
                                    + "possibly due to duplicated column name",
                            schemaChange,
                            e);
                }
            }
        } else if (schemaChange instanceof SchemaChange.UpdateColumnType) {
            SchemaChange.UpdateColumnType updateColumnType =
                    (SchemaChange.UpdateColumnType) schemaChange;
            TableSchema schema =
                    schemaManager
                            .latest()
                            .orElseThrow(
                                    () ->
                                            new RuntimeException(
                                                    "Table does not exist. This is unexpected."));
            int idx = schema.fieldNames().indexOf(updateColumnType.fieldName());
            Preconditions.checkState(
                    idx >= 0,
                    "Field name "
                            + updateColumnType.fieldName()
                            + " does not exist in table. This is unexpected.");
            DataType oldType = schema.fields().get(idx).type();
            DataType newType = updateColumnType.newDataType();
            switch (canConvert(oldType, newType)) {
                case CONVERT:
                    catalog.alterTable(identifier, schemaChange, false);
                    break;
                case EXCEPTION:
                    throw new UnsupportedOperationException(
                            String.format(
                                    "Cannot convert field %s from type %s to %s",
                                    updateColumnType.fieldName(), oldType, newType));
            }
        } else if (schemaChange instanceof SchemaChange.UpdateColumnComment) {
            catalog.alterTable(identifier, schemaChange, false);
        } else {
            throw new UnsupportedOperationException(
                    "Unsupported schema change class "
                            + schemaChange.getClass().getName()
                            + ", content "
                            + schemaChange);
        }
    }

    private static final List<DataTypeRoot> STRING_TYPES =
            Arrays.asList(DataTypeRoot.CHAR, DataTypeRoot.VARCHAR);
    private static final List<DataTypeRoot> BINARY_TYPES =
            Arrays.asList(DataTypeRoot.BINARY, DataTypeRoot.VARBINARY);
    private static final List<DataTypeRoot> INTEGER_TYPES =
            Arrays.asList(
                    DataTypeRoot.TINYINT,
                    DataTypeRoot.SMALLINT,
                    DataTypeRoot.INTEGER,
                    DataTypeRoot.BIGINT);
    private static final List<DataTypeRoot> FLOATING_POINT_TYPES =
            Arrays.asList(DataTypeRoot.FLOAT, DataTypeRoot.DOUBLE);

    public static ConvertAction canConvert(DataType oldType, DataType newType) {
        if (oldType.equalsIgnoreNullable(newType)) {
            return ConvertAction.CONVERT;
        }

        int oldIdx = STRING_TYPES.indexOf(oldType.getTypeRoot());
        int newIdx = STRING_TYPES.indexOf(newType.getTypeRoot());
        if (oldIdx >= 0 && newIdx >= 0) {
            return DataTypeChecks.getLength(oldType) <= DataTypeChecks.getLength(newType)
                    ? ConvertAction.CONVERT
                    : ConvertAction.IGNORE;
        }

        oldIdx = BINARY_TYPES.indexOf(oldType.getTypeRoot());
        newIdx = BINARY_TYPES.indexOf(newType.getTypeRoot());
        if (oldIdx >= 0 && newIdx >= 0) {
            return DataTypeChecks.getLength(oldType) <= DataTypeChecks.getLength(newType)
                    ? ConvertAction.CONVERT
                    : ConvertAction.IGNORE;
        }

        oldIdx = INTEGER_TYPES.indexOf(oldType.getTypeRoot());
        newIdx = INTEGER_TYPES.indexOf(newType.getTypeRoot());
        if (oldIdx >= 0 && newIdx >= 0) {
            return oldIdx <= newIdx ? ConvertAction.CONVERT : ConvertAction.IGNORE;
        }

        oldIdx = FLOATING_POINT_TYPES.indexOf(oldType.getTypeRoot());
        newIdx = FLOATING_POINT_TYPES.indexOf(newType.getTypeRoot());
        if (oldIdx >= 0 && newIdx >= 0) {
            return oldIdx <= newIdx ? ConvertAction.CONVERT : ConvertAction.IGNORE;
        }

        return ConvertAction.EXCEPTION;
    }

    /**
     * Return type of {@link MultiTableUpdatedDataFieldsProcessFunction#canConvert(DataType,
     * DataType)}. This enum indicates the action to perform.
     */
    public enum ConvertAction {

        /** {@code oldType} can be converted to {@code newType}. */
        CONVERT,

        /**
         * {@code oldType} and {@code newType} belongs to the same type family, but old type has
         * higher precision than new type. Ignore this convert request.
         */
        IGNORE,

        /**
         * {@code oldType} and {@code newType} belongs to different type family. Throw an exception
         * indicating that this convert request cannot be handled.
         */
        EXCEPTION
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java [136:258]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        if (schemaChange instanceof SchemaChange.AddColumn) {
            try {
                catalog.alterTable(identifier, schemaChange, false);
            } catch (Catalog.ColumnAlreadyExistException e) {
                // This is normal. For example when a table is split into multiple database tables,
                // all these tables will be added the same column. However, schemaManager can't
                // handle duplicated column adds, so we just catch the exception and log it.
                if (LOG.isDebugEnabled()) {
                    LOG.debug(
                            "Failed to perform SchemaChange.AddColumn {}, "
                                    + "possibly due to duplicated column name",
                            schemaChange,
                            e);
                }
            }
        } else if (schemaChange instanceof SchemaChange.UpdateColumnType) {
            SchemaChange.UpdateColumnType updateColumnType =
                    (SchemaChange.UpdateColumnType) schemaChange;
            TableSchema schema =
                    schemaManager
                            .latest()
                            .orElseThrow(
                                    () ->
                                            new RuntimeException(
                                                    "Table does not exist. This is unexpected."));
            int idx = schema.fieldNames().indexOf(updateColumnType.fieldName());
            Preconditions.checkState(
                    idx >= 0,
                    "Field name "
                            + updateColumnType.fieldName()
                            + " does not exist in table. This is unexpected.");
            DataType oldType = schema.fields().get(idx).type();
            DataType newType = updateColumnType.newDataType();
            switch (canConvert(oldType, newType)) {
                case CONVERT:
                    catalog.alterTable(identifier, schemaChange, false);
                    break;
                case EXCEPTION:
                    throw new UnsupportedOperationException(
                            String.format(
                                    "Cannot convert field %s from type %s to %s",
                                    updateColumnType.fieldName(), oldType, newType));
            }
        } else if (schemaChange instanceof SchemaChange.UpdateColumnComment) {
            catalog.alterTable(identifier, schemaChange, false);
        } else {
            throw new UnsupportedOperationException(
                    "Unsupported schema change class "
                            + schemaChange.getClass().getName()
                            + ", content "
                            + schemaChange);
        }
    }

    private static final List<DataTypeRoot> STRING_TYPES =
            Arrays.asList(DataTypeRoot.CHAR, DataTypeRoot.VARCHAR);
    private static final List<DataTypeRoot> BINARY_TYPES =
            Arrays.asList(DataTypeRoot.BINARY, DataTypeRoot.VARBINARY);
    private static final List<DataTypeRoot> INTEGER_TYPES =
            Arrays.asList(
                    DataTypeRoot.TINYINT,
                    DataTypeRoot.SMALLINT,
                    DataTypeRoot.INTEGER,
                    DataTypeRoot.BIGINT);
    private static final List<DataTypeRoot> FLOATING_POINT_TYPES =
            Arrays.asList(DataTypeRoot.FLOAT, DataTypeRoot.DOUBLE);

    public static ConvertAction canConvert(DataType oldType, DataType newType) {
        if (oldType.equalsIgnoreNullable(newType)) {
            return ConvertAction.CONVERT;
        }

        int oldIdx = STRING_TYPES.indexOf(oldType.getTypeRoot());
        int newIdx = STRING_TYPES.indexOf(newType.getTypeRoot());
        if (oldIdx >= 0 && newIdx >= 0) {
            return DataTypeChecks.getLength(oldType) <= DataTypeChecks.getLength(newType)
                    ? ConvertAction.CONVERT
                    : ConvertAction.IGNORE;
        }

        oldIdx = BINARY_TYPES.indexOf(oldType.getTypeRoot());
        newIdx = BINARY_TYPES.indexOf(newType.getTypeRoot());
        if (oldIdx >= 0 && newIdx >= 0) {
            return DataTypeChecks.getLength(oldType) <= DataTypeChecks.getLength(newType)
                    ? ConvertAction.CONVERT
                    : ConvertAction.IGNORE;
        }

        oldIdx = INTEGER_TYPES.indexOf(oldType.getTypeRoot());
        newIdx = INTEGER_TYPES.indexOf(newType.getTypeRoot());
        if (oldIdx >= 0 && newIdx >= 0) {
            return oldIdx <= newIdx ? ConvertAction.CONVERT : ConvertAction.IGNORE;
        }

        oldIdx = FLOATING_POINT_TYPES.indexOf(oldType.getTypeRoot());
        newIdx = FLOATING_POINT_TYPES.indexOf(newType.getTypeRoot());
        if (oldIdx >= 0 && newIdx >= 0) {
            return oldIdx <= newIdx ? ConvertAction.CONVERT : ConvertAction.IGNORE;
        }

        return ConvertAction.EXCEPTION;
    }

    /**
     * Return type of {@link UpdatedDataFieldsProcessFunction#canConvert(DataType, DataType)}. This
     * enum indicates the action to perform.
     */
    public enum ConvertAction {

        /** {@code oldType} can be converted to {@code newType}. */
        CONVERT,

        /**
         * {@code oldType} and {@code newType} belongs to the same type family, but old type has
         * higher precision than new type. Ignore this convert request.
         */
        IGNORE,

        /**
         * {@code oldType} and {@code newType} belongs to different type family. Throw an exception
         * indicating that this convert request cannot be handled.
         */
        EXCEPTION
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



