in flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java [126:249]
private static DataType convertFromColumn(Column column, boolean tinyInt1isBit) {
String typeName = column.typeName();
switch (typeName) {
case BIT:
return column.length() == 1
? DataTypes.BOOLEAN()
: DataTypes.BINARY((column.length() + 7) / 8);
case BOOL:
case BOOLEAN:
return DataTypes.BOOLEAN();
case TINYINT:
// MySQL haven't boolean type, it uses tinyint(1) to represents boolean type
// user should not use tinyint(1) to store number although jdbc url parameter
// tinyInt1isBit=false can help change the return value, it's not a general way
// btw: mybatis and mysql-connector-java map tinyint(1) to boolean by default
return (column.length() == 1 && tinyInt1isBit)
? DataTypes.BOOLEAN()
: DataTypes.TINYINT();
case TINYINT_UNSIGNED:
case TINYINT_UNSIGNED_ZEROFILL:
case SMALLINT:
return DataTypes.SMALLINT();
case SMALLINT_UNSIGNED:
case SMALLINT_UNSIGNED_ZEROFILL:
case INT:
case INTEGER:
case MEDIUMINT:
case MEDIUMINT_UNSIGNED:
case MEDIUMINT_UNSIGNED_ZEROFILL:
case YEAR:
return DataTypes.INT();
case INT_UNSIGNED:
case INT_UNSIGNED_ZEROFILL:
case INTEGER_UNSIGNED:
case INTEGER_UNSIGNED_ZEROFILL:
case BIGINT:
return DataTypes.BIGINT();
case BIGINT_UNSIGNED:
case BIGINT_UNSIGNED_ZEROFILL:
case SERIAL:
return DataTypes.DECIMAL(20, 0);
case FLOAT:
case FLOAT_UNSIGNED:
case FLOAT_UNSIGNED_ZEROFILL:
if (column.length() != FLOAT_LENGTH_UNSPECIFIED_FLAG) {
// For FLOAT types with length provided explicitly, treat it like DOUBLE
return DataTypes.DOUBLE();
} else {
return DataTypes.FLOAT();
}
case REAL:
case REAL_UNSIGNED:
case REAL_UNSIGNED_ZEROFILL:
case DOUBLE:
case DOUBLE_UNSIGNED:
case DOUBLE_UNSIGNED_ZEROFILL:
case DOUBLE_PRECISION:
case DOUBLE_PRECISION_UNSIGNED:
case DOUBLE_PRECISION_UNSIGNED_ZEROFILL:
return DataTypes.DOUBLE();
case NUMERIC:
case NUMERIC_UNSIGNED:
case NUMERIC_UNSIGNED_ZEROFILL:
case FIXED:
case FIXED_UNSIGNED:
case FIXED_UNSIGNED_ZEROFILL:
case DECIMAL:
case DECIMAL_UNSIGNED:
case DECIMAL_UNSIGNED_ZEROFILL:
return column.length() <= 38
? DataTypes.DECIMAL(column.length(), column.scale().orElse(0))
: DataTypes.STRING();
case TIME:
return column.length() >= 0 ? DataTypes.TIME(column.length()) : DataTypes.TIME();
case DATE:
return DataTypes.DATE();
case DATETIME:
return column.length() >= 0
? DataTypes.TIMESTAMP(column.length())
: DataTypes.TIMESTAMP(0);
case TIMESTAMP:
return column.length() >= 0
? DataTypes.TIMESTAMP_LTZ(column.length())
: DataTypes.TIMESTAMP_LTZ(0);
case CHAR:
return column.length() > 0
? DataTypes.CHAR(column.length())
: column.length() == 0 ? CharType.ofEmptyLiteral() : DataTypes.CHAR(1);
case VARCHAR:
return DataTypes.VARCHAR(column.length());
case TINYTEXT:
case TEXT:
case MEDIUMTEXT:
case LONGTEXT:
case JSON:
case ENUM:
case GEOMETRY:
case POINT:
case LINESTRING:
case POLYGON:
case GEOMETRYCOLLECTION:
case GEOMCOLLECTION:
case MULTIPOINT:
case MULTIPOLYGON:
case MULTILINESTRING:
return DataTypes.STRING();
case BINARY:
return column.length() > 0
? DataTypes.BINARY(column.length())
: column.length() == 0 ? BinaryType.ofEmptyLiteral() : DataTypes.BINARY(1);
case VARBINARY:
return DataTypes.VARBINARY(column.length());
case TINYBLOB:
case BLOB:
case MEDIUMBLOB:
case LONGBLOB:
return DataTypes.BYTES();
case SET:
return DataTypes.ARRAY(DataTypes.STRING());
default:
throw new UnsupportedOperationException(
String.format("MySQL type '%s' is not supported yet.", typeName));
}
}