public static DataType toDataType()

in paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java [148:284]


    public static DataType toDataType(
            String type,
            @Nullable Integer length,
            @Nullable Integer scale,
            Boolean tinyInt1ToBool) {
        switch (type.toUpperCase()) {
            case BIT:
            case BOOLEAN:
            case BOOL:
                return DataTypes.BOOLEAN();
            case TINYINT:
                // MySQL haven't boolean type, it uses tinyint(1) to represents boolean type
                // user should not use tinyint(1) to store number although jdbc url parameter
                // tinyInt1isBit=false can help change the return value, it's not a general way.
                // mybatis and mysql-connector-java map tinyint(1) to boolean by default, we behave
                // the same way by default. To store number (-128~127), we can set the parameter
                // tinyInt1ToByte (option 'mysql.converter.tinyint1-to-bool') to false, then
                // tinyint(1)
                // will be mapped to TinyInt.
                return length != null && length == 1 && tinyInt1ToBool
                        ? DataTypes.BOOLEAN()
                        : DataTypes.TINYINT();
            case TINYINT_UNSIGNED:
            case TINYINT_UNSIGNED_ZEROFILL:
            case SMALLINT:
                return DataTypes.SMALLINT();
            case SMALLINT_UNSIGNED:
            case SMALLINT_UNSIGNED_ZEROFILL:
            case INT:
            case MEDIUMINT:
            case YEAR:
                return DataTypes.INT();
            case INT_UNSIGNED:
            case INT_UNSIGNED_ZEROFILL:
            case MEDIUMINT_UNSIGNED:
            case MEDIUMINT_UNSIGNED_ZEROFILL:
            case BIGINT:
                return DataTypes.BIGINT();
            case BIGINT_UNSIGNED:
            case BIGINT_UNSIGNED_ZEROFILL:
            case SERIAL:
                return DataTypes.DECIMAL(20, 0);
            case FLOAT:
            case FLOAT_UNSIGNED:
            case FLOAT_UNSIGNED_ZEROFILL:
                return DataTypes.FLOAT();
            case REAL:
            case REAL_UNSIGNED:
            case REAL_UNSIGNED_ZEROFILL:
            case DOUBLE:
            case DOUBLE_UNSIGNED:
            case DOUBLE_UNSIGNED_ZEROFILL:
            case DOUBLE_PRECISION:
            case DOUBLE_PRECISION_UNSIGNED:
            case DOUBLE_PRECISION_UNSIGNED_ZEROFILL:
                return DataTypes.DOUBLE();
            case NUMERIC:
            case NUMERIC_UNSIGNED:
            case NUMERIC_UNSIGNED_ZEROFILL:
            case FIXED:
            case FIXED_UNSIGNED:
            case FIXED_UNSIGNED_ZEROFILL:
            case DECIMAL:
            case DECIMAL_UNSIGNED:
            case DECIMAL_UNSIGNED_ZEROFILL:
                return length != null && length <= 38
                        ? DataTypes.DECIMAL(length, scale != null && scale >= 0 ? scale : 0)
                        : DataTypes.STRING();
            case DATE:
                return DataTypes.DATE();
            case TIME:
                return DataTypes.TIME();
            case DATETIME:
            case TIMESTAMP:
                if (length == null) {
                    // default precision is 0
                    // see https://dev.mysql.com/doc/refman/8.0/en/date-and-time-type-syntax.html
                    return DataTypes.TIMESTAMP(0);
                } else if (length >= JDBC_TIMESTAMP_BASE_LENGTH) {
                    if (length > JDBC_TIMESTAMP_BASE_LENGTH + 1) {
                        // Timestamp with a fraction of seconds.
                        // For example "2023-03-23 17:20:00.01".
                        // The decimal point will occupy 1 character.
                        return DataTypes.TIMESTAMP(length - JDBC_TIMESTAMP_BASE_LENGTH - 1);
                    } else {
                        return DataTypes.TIMESTAMP(0);
                    }
                } else if (length >= 0 && length <= TimestampType.MAX_PRECISION) {
                    return DataTypes.TIMESTAMP(length);
                } else {
                    throw new UnsupportedOperationException(
                            "Unsupported length "
                                    + length
                                    + " for MySQL DATETIME and TIMESTAMP types");
                }
                // because tidb ddl event does not contain field precision
            case CHAR:
                return length == null || length == 0 ? DataTypes.STRING() : DataTypes.CHAR(length);
            case VARCHAR:
                return length == null || length == 0
                        ? DataTypes.STRING()
                        : DataTypes.VARCHAR(length);
            case TINYTEXT:
            case TEXT:
            case MEDIUMTEXT:
            case LONGTEXT:
            case JSON:
            case ENUM:
            case GEOMETRY:
            case POINT:
            case LINESTRING:
            case POLYGON:
            case MULTIPOINT:
            case MULTILINESTRING:
            case MULTIPOLYGON:
            case GEOMETRYCOLLECTION:
                return DataTypes.STRING();
            case BINARY:
                return length == null || length == 0
                        ? DataTypes.BINARY(BinaryType.DEFAULT_LENGTH)
                        : DataTypes.BINARY(length);
            case VARBINARY:
                return length == null || length == 0
                        ? DataTypes.VARBINARY(VarBinaryType.DEFAULT_LENGTH)
                        : DataTypes.VARBINARY(length);
            case TINYBLOB:
            case BLOB:
            case MEDIUMBLOB:
            case LONGBLOB:
                return DataTypes.BYTES();
            case SET:
                return DataTypes.ARRAY(DataTypes.STRING());
            default:
                throw new UnsupportedOperationException(
                        String.format("Don't support MySQL type '%s' yet.", type));
        }
    }