in flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/catalog/MySqlTypeMapper.java [103:203]
public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex)
throws SQLException {
String mysqlType = metadata.getColumnTypeName(colIndex).toUpperCase();
String columnName = metadata.getColumnName(colIndex);
int precision = metadata.getPrecision(colIndex);
int scale = metadata.getScale(colIndex);
switch (mysqlType) {
case MYSQL_BIT:
return DataTypes.BOOLEAN();
case MYSQL_TINYBLOB:
case MYSQL_MEDIUMBLOB:
case MYSQL_BLOB:
case MYSQL_LONGBLOB:
case MYSQL_VARBINARY:
case MYSQL_BINARY:
// BINARY is not supported in MySqlDialect now.
// VARBINARY(n) is not supported in MySqlDialect when 'n' is not equals to
// Integer.MAX_VALUE. Please see
// org.apache.flink.connector.jdbc.databases.mysql.dialect.MySqlDialect#supportedTypes and
// org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialect#validate for
// more
// details.
return DataTypes.BYTES();
case MYSQL_TINYINT:
return DataTypes.TINYINT();
case MYSQL_TINYINT_UNSIGNED:
case MYSQL_SMALLINT:
return DataTypes.SMALLINT();
case MYSQL_SMALLINT_UNSIGNED:
case MYSQL_MEDIUMINT:
case MYSQL_MEDIUMINT_UNSIGNED:
case MYSQL_INT:
case MYSQL_INTEGER:
return DataTypes.INT();
case MYSQL_INT_UNSIGNED:
case MYSQL_INTEGER_UNSIGNED:
case MYSQL_BIGINT:
return DataTypes.BIGINT();
case MYSQL_BIGINT_UNSIGNED:
return DataTypes.DECIMAL(20, 0);
case MYSQL_DECIMAL:
return DataTypes.DECIMAL(precision, scale);
case MYSQL_DECIMAL_UNSIGNED:
checkMaxPrecision(tablePath, columnName, precision);
return DataTypes.DECIMAL(precision + 1, scale);
case MYSQL_FLOAT:
return DataTypes.FLOAT();
case MYSQL_FLOAT_UNSIGNED:
LOG.warn("{} will probably cause value overflow.", MYSQL_FLOAT_UNSIGNED);
return DataTypes.FLOAT();
case MYSQL_DOUBLE:
return DataTypes.DOUBLE();
case MYSQL_DOUBLE_UNSIGNED:
LOG.warn("{} will probably cause value overflow.", MYSQL_DOUBLE_UNSIGNED);
return DataTypes.DOUBLE();
case MYSQL_CHAR:
return DataTypes.CHAR(precision);
case MYSQL_VARCHAR:
case MYSQL_TINYTEXT:
case MYSQL_MEDIUMTEXT:
case MYSQL_TEXT:
return DataTypes.VARCHAR(precision);
case MYSQL_JSON:
return DataTypes.STRING();
case MYSQL_LONGTEXT:
LOG.warn(
"Type '{}' has a maximum precision of 536870911 in MySQL. "
+ "Due to limitations in the Flink type system, "
+ "the precision will be set to 2147483647.",
MYSQL_LONGTEXT);
return DataTypes.STRING();
case MYSQL_DATE:
return DataTypes.DATE();
case MYSQL_TIME:
return isExplicitPrecision(precision, RAW_TIME_LENGTH)
? DataTypes.TIME(precision - RAW_TIME_LENGTH - 1)
: DataTypes.TIME(0);
case MYSQL_DATETIME:
case MYSQL_TIMESTAMP:
boolean explicitPrecision = isExplicitPrecision(precision, RAW_TIMESTAMP_LENGTH);
if (explicitPrecision) {
int p = precision - RAW_TIMESTAMP_LENGTH - 1;
if (p <= 6 && p >= 0) {
return DataTypes.TIMESTAMP(p);
}
return p > 6 ? DataTypes.TIMESTAMP(6) : DataTypes.TIMESTAMP(0);
}
return DataTypes.TIMESTAMP(0);
case MYSQL_YEAR:
case MYSQL_GEOMETRY:
case MYSQL_UNKNOWN:
default:
final String jdbcColumnName = metadata.getColumnName(colIndex);
throw new UnsupportedOperationException(
String.format(
"Doesn't support MySQL type '%s' on column '%s' in MySQL version %s, driver version %s yet.",
mysqlType, jdbcColumnName, databaseVersion, driverVersion));
}
}