in paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java [148:284]
public static DataType toDataType(
String type,
@Nullable Integer length,
@Nullable Integer scale,
Boolean tinyInt1ToBool) {
switch (type.toUpperCase()) {
case BIT:
case BOOLEAN:
case BOOL:
return DataTypes.BOOLEAN();
case TINYINT:
// MySQL haven't boolean type, it uses tinyint(1) to represents boolean type
// user should not use tinyint(1) to store number although jdbc url parameter
// tinyInt1isBit=false can help change the return value, it's not a general way.
// mybatis and mysql-connector-java map tinyint(1) to boolean by default, we behave
// the same way by default. To store number (-128~127), we can set the parameter
// tinyInt1ToByte (option 'mysql.converter.tinyint1-to-bool') to false, then
// tinyint(1)
// will be mapped to TinyInt.
return length != null && length == 1 && tinyInt1ToBool
? DataTypes.BOOLEAN()
: DataTypes.TINYINT();
case TINYINT_UNSIGNED:
case TINYINT_UNSIGNED_ZEROFILL:
case SMALLINT:
return DataTypes.SMALLINT();
case SMALLINT_UNSIGNED:
case SMALLINT_UNSIGNED_ZEROFILL:
case INT:
case MEDIUMINT:
case YEAR:
return DataTypes.INT();
case INT_UNSIGNED:
case INT_UNSIGNED_ZEROFILL:
case MEDIUMINT_UNSIGNED:
case MEDIUMINT_UNSIGNED_ZEROFILL:
case BIGINT:
return DataTypes.BIGINT();
case BIGINT_UNSIGNED:
case BIGINT_UNSIGNED_ZEROFILL:
case SERIAL:
return DataTypes.DECIMAL(20, 0);
case FLOAT:
case FLOAT_UNSIGNED:
case FLOAT_UNSIGNED_ZEROFILL:
return DataTypes.FLOAT();
case REAL:
case REAL_UNSIGNED:
case REAL_UNSIGNED_ZEROFILL:
case DOUBLE:
case DOUBLE_UNSIGNED:
case DOUBLE_UNSIGNED_ZEROFILL:
case DOUBLE_PRECISION:
case DOUBLE_PRECISION_UNSIGNED:
case DOUBLE_PRECISION_UNSIGNED_ZEROFILL:
return DataTypes.DOUBLE();
case NUMERIC:
case NUMERIC_UNSIGNED:
case NUMERIC_UNSIGNED_ZEROFILL:
case FIXED:
case FIXED_UNSIGNED:
case FIXED_UNSIGNED_ZEROFILL:
case DECIMAL:
case DECIMAL_UNSIGNED:
case DECIMAL_UNSIGNED_ZEROFILL:
return length != null && length <= 38
? DataTypes.DECIMAL(length, scale != null && scale >= 0 ? scale : 0)
: DataTypes.STRING();
case DATE:
return DataTypes.DATE();
case TIME:
return DataTypes.TIME();
case DATETIME:
case TIMESTAMP:
if (length == null) {
// default precision is 0
// see https://dev.mysql.com/doc/refman/8.0/en/date-and-time-type-syntax.html
return DataTypes.TIMESTAMP(0);
} else if (length >= JDBC_TIMESTAMP_BASE_LENGTH) {
if (length > JDBC_TIMESTAMP_BASE_LENGTH + 1) {
// Timestamp with a fraction of seconds.
// For example "2023-03-23 17:20:00.01".
// The decimal point will occupy 1 character.
return DataTypes.TIMESTAMP(length - JDBC_TIMESTAMP_BASE_LENGTH - 1);
} else {
return DataTypes.TIMESTAMP(0);
}
} else if (length >= 0 && length <= TimestampType.MAX_PRECISION) {
return DataTypes.TIMESTAMP(length);
} else {
throw new UnsupportedOperationException(
"Unsupported length "
+ length
+ " for MySQL DATETIME and TIMESTAMP types");
}
// because tidb ddl event does not contain field precision
case CHAR:
return length == null || length == 0 ? DataTypes.STRING() : DataTypes.CHAR(length);
case VARCHAR:
return length == null || length == 0
? DataTypes.STRING()
: DataTypes.VARCHAR(length);
case TINYTEXT:
case TEXT:
case MEDIUMTEXT:
case LONGTEXT:
case JSON:
case ENUM:
case GEOMETRY:
case POINT:
case LINESTRING:
case POLYGON:
case MULTIPOINT:
case MULTILINESTRING:
case MULTIPOLYGON:
case GEOMETRYCOLLECTION:
return DataTypes.STRING();
case BINARY:
return length == null || length == 0
? DataTypes.BINARY(BinaryType.DEFAULT_LENGTH)
: DataTypes.BINARY(length);
case VARBINARY:
return length == null || length == 0
? DataTypes.VARBINARY(VarBinaryType.DEFAULT_LENGTH)
: DataTypes.VARBINARY(length);
case TINYBLOB:
case BLOB:
case MEDIUMBLOB:
case LONGBLOB:
return DataTypes.BYTES();
case SET:
return DataTypes.ARRAY(DataTypes.STRING());
default:
throw new UnsupportedOperationException(
String.format("Don't support MySQL type '%s' yet.", type));
}
}