public DataType mapping()

in flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/catalog/MySqlTypeMapper.java [103:203]


    public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex)
            throws SQLException {
        String mysqlType = metadata.getColumnTypeName(colIndex).toUpperCase();
        String columnName = metadata.getColumnName(colIndex);
        int precision = metadata.getPrecision(colIndex);
        int scale = metadata.getScale(colIndex);

        switch (mysqlType) {
            case MYSQL_BIT:
                return DataTypes.BOOLEAN();
            case MYSQL_TINYBLOB:
            case MYSQL_MEDIUMBLOB:
            case MYSQL_BLOB:
            case MYSQL_LONGBLOB:
            case MYSQL_VARBINARY:
            case MYSQL_BINARY:
                // BINARY is not supported in MySqlDialect now.
                // VARBINARY(n) is not supported in MySqlDialect when 'n' is not equals to
                // Integer.MAX_VALUE. Please see
                // org.apache.flink.connector.jdbc.databases.mysql.dialect.MySqlDialect#supportedTypes and
                // org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialect#validate for
                // more
                // details.
                return DataTypes.BYTES();
            case MYSQL_TINYINT:
                return DataTypes.TINYINT();
            case MYSQL_TINYINT_UNSIGNED:
            case MYSQL_SMALLINT:
                return DataTypes.SMALLINT();
            case MYSQL_SMALLINT_UNSIGNED:
            case MYSQL_MEDIUMINT:
            case MYSQL_MEDIUMINT_UNSIGNED:
            case MYSQL_INT:
            case MYSQL_INTEGER:
                return DataTypes.INT();
            case MYSQL_INT_UNSIGNED:
            case MYSQL_INTEGER_UNSIGNED:
            case MYSQL_BIGINT:
                return DataTypes.BIGINT();
            case MYSQL_BIGINT_UNSIGNED:
                return DataTypes.DECIMAL(20, 0);
            case MYSQL_DECIMAL:
                return DataTypes.DECIMAL(precision, scale);
            case MYSQL_DECIMAL_UNSIGNED:
                checkMaxPrecision(tablePath, columnName, precision);
                return DataTypes.DECIMAL(precision + 1, scale);
            case MYSQL_FLOAT:
                return DataTypes.FLOAT();
            case MYSQL_FLOAT_UNSIGNED:
                LOG.warn("{} will probably cause value overflow.", MYSQL_FLOAT_UNSIGNED);
                return DataTypes.FLOAT();
            case MYSQL_DOUBLE:
                return DataTypes.DOUBLE();
            case MYSQL_DOUBLE_UNSIGNED:
                LOG.warn("{} will probably cause value overflow.", MYSQL_DOUBLE_UNSIGNED);
                return DataTypes.DOUBLE();
            case MYSQL_CHAR:
                return DataTypes.CHAR(precision);
            case MYSQL_VARCHAR:
            case MYSQL_TINYTEXT:
            case MYSQL_MEDIUMTEXT:
            case MYSQL_TEXT:
                return DataTypes.VARCHAR(precision);
            case MYSQL_JSON:
                return DataTypes.STRING();
            case MYSQL_LONGTEXT:
                LOG.warn(
                        "Type '{}' has a maximum precision of 536870911 in MySQL. "
                                + "Due to limitations in the Flink type system, "
                                + "the precision will be set to 2147483647.",
                        MYSQL_LONGTEXT);
                return DataTypes.STRING();
            case MYSQL_DATE:
                return DataTypes.DATE();
            case MYSQL_TIME:
                return isExplicitPrecision(precision, RAW_TIME_LENGTH)
                        ? DataTypes.TIME(precision - RAW_TIME_LENGTH - 1)
                        : DataTypes.TIME(0);
            case MYSQL_DATETIME:
            case MYSQL_TIMESTAMP:
                boolean explicitPrecision = isExplicitPrecision(precision, RAW_TIMESTAMP_LENGTH);
                if (explicitPrecision) {
                    int p = precision - RAW_TIMESTAMP_LENGTH - 1;
                    if (p <= 6 && p >= 0) {
                        return DataTypes.TIMESTAMP(p);
                    }
                    return p > 6 ? DataTypes.TIMESTAMP(6) : DataTypes.TIMESTAMP(0);
                }
                return DataTypes.TIMESTAMP(0);

            case MYSQL_YEAR:
            case MYSQL_GEOMETRY:
            case MYSQL_UNKNOWN:
            default:
                final String jdbcColumnName = metadata.getColumnName(colIndex);
                throw new UnsupportedOperationException(
                        String.format(
                                "Doesn't support MySQL type '%s' on column '%s' in MySQL version %s, driver version %s yet.",
                                mysqlType, jdbcColumnName, databaseVersion, driverVersion));
        }
    }