in paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java [181:335]
public static DataType toDataType(
String type,
@Nullable Integer length,
@Nullable Integer scale,
TypeMapping typeMapping) {
if (typeMapping.containsMode(TO_STRING)) {
return DataTypes.STRING();
}
switch (type.toUpperCase()) {
case BIT:
if (length == null || length == 1) {
return DataTypes.BOOLEAN();
} else {
return DataTypes.BINARY((length + 7) / 8);
}
case BOOLEAN:
case BOOL:
return DataTypes.BOOLEAN();
case TINYINT:
// MySQL haven't boolean type, it uses tinyint(1) to represents boolean type.
// User should not use tinyint(1) to store number although jdbc url parameter
// tinyInt1isBit=false can help change the return value, it's not a general way.
// Mybatis and mysql-connector-java map tinyint(1) to boolean by default, we behave
// the same way by default. To store number (-128~127), user can set the type
// mapping option 'tinyint1-not-bool' then tinyint(1) will be mapped to tinyint.
return length != null && length == 1 && !typeMapping.containsMode(TINYINT1_NOT_BOOL)
? DataTypes.BOOLEAN()
: DataTypes.TINYINT();
case TINYINT_UNSIGNED:
case TINYINT_UNSIGNED_ZEROFILL:
case SMALLINT:
return DataTypes.SMALLINT();
case SMALLINT_UNSIGNED:
case SMALLINT_UNSIGNED_ZEROFILL:
case INT:
case INTEGER:
case MEDIUMINT:
case YEAR:
return DataTypes.INT();
case INT_UNSIGNED:
case INTEGER_UNSIGNED:
case INT_UNSIGNED_ZEROFILL:
case INTEGER_UNSIGNED_ZEROFILL:
case MEDIUMINT_UNSIGNED:
case MEDIUMINT_UNSIGNED_ZEROFILL:
case BIGINT:
case LONG:
return DataTypes.BIGINT();
case BIGINT_UNSIGNED:
case BIGINT_UNSIGNED_ZEROFILL:
case SERIAL:
return typeMapping.containsMode(BIGINT_UNSIGNED_TO_BIGINT)
? DataTypes.BIGINT()
: DataTypes.DECIMAL(20, 0);
case FLOAT:
case FLOAT_UNSIGNED:
case FLOAT_UNSIGNED_ZEROFILL:
return DataTypes.FLOAT();
case REAL:
case REAL_UNSIGNED:
case REAL_UNSIGNED_ZEROFILL:
case DOUBLE:
case DOUBLE_UNSIGNED:
case DOUBLE_UNSIGNED_ZEROFILL:
case DOUBLE_PRECISION:
case DOUBLE_PRECISION_UNSIGNED:
case DOUBLE_PRECISION_UNSIGNED_ZEROFILL:
return DataTypes.DOUBLE();
case NUMERIC:
case NUMERIC_UNSIGNED:
case NUMERIC_UNSIGNED_ZEROFILL:
case FIXED:
case FIXED_UNSIGNED:
case FIXED_UNSIGNED_ZEROFILL:
case DECIMAL:
case DECIMAL_UNSIGNED:
case DECIMAL_UNSIGNED_ZEROFILL:
return length != null && length <= 38
? DataTypes.DECIMAL(length, scale != null && scale >= 0 ? scale : 0)
: DataTypes.STRING();
case DATE:
return DataTypes.DATE();
case TIME:
return DataTypes.TIME();
case DATETIME:
case TIMESTAMP:
if (length == null || length <= 0) {
// default precision is 0
// see https://dev.mysql.com/doc/refman/8.0/en/date-and-time-type-syntax.html
return DataTypes.TIMESTAMP(0);
} else if (length >= JDBC_TIMESTAMP_BASE_LENGTH) {
if (length > JDBC_TIMESTAMP_BASE_LENGTH + 1) {
// Timestamp with a fraction of seconds.
// For example "2023-03-23 17:20:00.01".
// The decimal point will occupy 1 character.
return DataTypes.TIMESTAMP(length - JDBC_TIMESTAMP_BASE_LENGTH - 1);
} else {
return DataTypes.TIMESTAMP(0);
}
} else if (length <= TimestampType.MAX_PRECISION) {
return DataTypes.TIMESTAMP(length);
} else {
throw new UnsupportedOperationException(
"Unsupported length "
+ length
+ " for MySQL DATETIME and TIMESTAMP types");
}
// because tidb ddl event does not contain field precision
case CHAR:
return length == null || length == 0 || typeMapping.containsMode(CHAR_TO_STRING)
? DataTypes.STRING()
: DataTypes.CHAR(length);
case VARCHAR:
return length == null || length == 0 || typeMapping.containsMode(CHAR_TO_STRING)
? DataTypes.STRING()
: DataTypes.VARCHAR(length);
case TINYTEXT:
case TEXT:
case MEDIUMTEXT:
case JSON:
case ENUM:
case GEOMETRY:
case POINT:
case LINESTRING:
case POLYGON:
case MULTIPOINT:
case MULTILINESTRING:
case MULTIPOLYGON:
case GEOMETRYCOLLECTION:
case STRING:
return DataTypes.STRING();
// MySQL BINARY and VARBINARY are stored as bytes in JSON. We convert them to
// DataTypes.VARBINARY to retain the length information
case BINARY:
case VARBINARY:
return length == null || length == 0
? DataTypes.VARBINARY(VarBinaryType.DEFAULT_LENGTH)
: DataTypes.VARBINARY(length);
case LONGTEXT:
return typeMapping.containsMode(LONGTEXT_TO_BYTES)
? DataTypes.BYTES()
: DataTypes.STRING();
case TINYBLOB:
case BLOB:
case MEDIUMBLOB:
case LONGBLOB:
return DataTypes.BYTES();
case SET:
return DataTypes.ARRAY(DataTypes.STRING());
default:
throw new UnsupportedOperationException(
String.format("Don't support MySQL type '%s' yet.", type));
}
}