public static DataType toDataType()

in paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java [181:335]


    public static DataType toDataType(
            String type,
            @Nullable Integer length,
            @Nullable Integer scale,
            TypeMapping typeMapping) {
        if (typeMapping.containsMode(TO_STRING)) {
            return DataTypes.STRING();
        }

        switch (type.toUpperCase()) {
            case BIT:
                if (length == null || length == 1) {
                    return DataTypes.BOOLEAN();
                } else {
                    return DataTypes.BINARY((length + 7) / 8);
                }
            case BOOLEAN:
            case BOOL:
                return DataTypes.BOOLEAN();
            case TINYINT:
                // MySQL haven't boolean type, it uses tinyint(1) to represents boolean type.
                // User should not use tinyint(1) to store number although jdbc url parameter
                // tinyInt1isBit=false can help change the return value, it's not a general way.
                // Mybatis and mysql-connector-java map tinyint(1) to boolean by default, we behave
                // the same way by default. To store number (-128~127), user can set the type
                // mapping option 'tinyint1-not-bool' then tinyint(1) will be mapped to tinyint.
                return length != null && length == 1 && !typeMapping.containsMode(TINYINT1_NOT_BOOL)
                        ? DataTypes.BOOLEAN()
                        : DataTypes.TINYINT();
            case TINYINT_UNSIGNED:
            case TINYINT_UNSIGNED_ZEROFILL:
            case SMALLINT:
                return DataTypes.SMALLINT();
            case SMALLINT_UNSIGNED:
            case SMALLINT_UNSIGNED_ZEROFILL:
            case INT:
            case INTEGER:
            case MEDIUMINT:
            case YEAR:
                return DataTypes.INT();
            case INT_UNSIGNED:
            case INTEGER_UNSIGNED:
            case INT_UNSIGNED_ZEROFILL:
            case INTEGER_UNSIGNED_ZEROFILL:
            case MEDIUMINT_UNSIGNED:
            case MEDIUMINT_UNSIGNED_ZEROFILL:
            case BIGINT:
            case LONG:
                return DataTypes.BIGINT();
            case BIGINT_UNSIGNED:
            case BIGINT_UNSIGNED_ZEROFILL:
            case SERIAL:
                return typeMapping.containsMode(BIGINT_UNSIGNED_TO_BIGINT)
                        ? DataTypes.BIGINT()
                        : DataTypes.DECIMAL(20, 0);
            case FLOAT:
            case FLOAT_UNSIGNED:
            case FLOAT_UNSIGNED_ZEROFILL:
                return DataTypes.FLOAT();
            case REAL:
            case REAL_UNSIGNED:
            case REAL_UNSIGNED_ZEROFILL:
            case DOUBLE:
            case DOUBLE_UNSIGNED:
            case DOUBLE_UNSIGNED_ZEROFILL:
            case DOUBLE_PRECISION:
            case DOUBLE_PRECISION_UNSIGNED:
            case DOUBLE_PRECISION_UNSIGNED_ZEROFILL:
                return DataTypes.DOUBLE();
            case NUMERIC:
            case NUMERIC_UNSIGNED:
            case NUMERIC_UNSIGNED_ZEROFILL:
            case FIXED:
            case FIXED_UNSIGNED:
            case FIXED_UNSIGNED_ZEROFILL:
            case DECIMAL:
            case DECIMAL_UNSIGNED:
            case DECIMAL_UNSIGNED_ZEROFILL:
                return length != null && length <= 38
                        ? DataTypes.DECIMAL(length, scale != null && scale >= 0 ? scale : 0)
                        : DataTypes.STRING();
            case DATE:
                return DataTypes.DATE();
            case TIME:
                return DataTypes.TIME();
            case DATETIME:
            case TIMESTAMP:
                if (length == null || length <= 0) {
                    // default precision is 0
                    // see https://dev.mysql.com/doc/refman/8.0/en/date-and-time-type-syntax.html
                    return DataTypes.TIMESTAMP(0);
                } else if (length >= JDBC_TIMESTAMP_BASE_LENGTH) {
                    if (length > JDBC_TIMESTAMP_BASE_LENGTH + 1) {
                        // Timestamp with a fraction of seconds.
                        // For example "2023-03-23 17:20:00.01".
                        // The decimal point will occupy 1 character.
                        return DataTypes.TIMESTAMP(length - JDBC_TIMESTAMP_BASE_LENGTH - 1);
                    } else {
                        return DataTypes.TIMESTAMP(0);
                    }
                } else if (length <= TimestampType.MAX_PRECISION) {
                    return DataTypes.TIMESTAMP(length);
                } else {
                    throw new UnsupportedOperationException(
                            "Unsupported length "
                                    + length
                                    + " for MySQL DATETIME and TIMESTAMP types");
                }
                // because tidb ddl event does not contain field precision
            case CHAR:
                return length == null || length == 0 || typeMapping.containsMode(CHAR_TO_STRING)
                        ? DataTypes.STRING()
                        : DataTypes.CHAR(length);
            case VARCHAR:
                return length == null || length == 0 || typeMapping.containsMode(CHAR_TO_STRING)
                        ? DataTypes.STRING()
                        : DataTypes.VARCHAR(length);
            case TINYTEXT:
            case TEXT:
            case MEDIUMTEXT:
            case JSON:
            case ENUM:
            case GEOMETRY:
            case POINT:
            case LINESTRING:
            case POLYGON:
            case MULTIPOINT:
            case MULTILINESTRING:
            case MULTIPOLYGON:
            case GEOMETRYCOLLECTION:
            case STRING:
                return DataTypes.STRING();
                // MySQL BINARY and VARBINARY are stored as bytes in JSON. We convert them to
                // DataTypes.VARBINARY to retain the length information
            case BINARY:
            case VARBINARY:
                return length == null || length == 0
                        ? DataTypes.VARBINARY(VarBinaryType.DEFAULT_LENGTH)
                        : DataTypes.VARBINARY(length);
            case LONGTEXT:
                return typeMapping.containsMode(LONGTEXT_TO_BYTES)
                        ? DataTypes.BYTES()
                        : DataTypes.STRING();
            case TINYBLOB:
            case BLOB:
            case MEDIUMBLOB:
            case LONGBLOB:
                return DataTypes.BYTES();
            case SET:
                return DataTypes.ARRAY(DataTypes.STRING());
            default:
                throw new UnsupportedOperationException(
                        String.format("Don't support MySQL type '%s' yet.", type));
        }
    }