in inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/schema/MySqlTypeUtils.java [89:147]
private static DataType convertFromColumn(Column column) {
String typeName = column.typeName();
switch (typeName) {
case TINYINT:
return column.length() == 1 ? DataTypes.BOOLEAN() : DataTypes.TINYINT();
case TINYINT_UNSIGNED:
case TINYINT_UNSIGNED_ZEROFILL:
case SMALLINT:
return DataTypes.SMALLINT();
case SMALLINT_UNSIGNED:
case SMALLINT_UNSIGNED_ZEROFILL:
case INT:
case MEDIUMINT:
return DataTypes.INT();
case INT_UNSIGNED:
case INT_UNSIGNED_ZEROFILL:
case MEDIUMINT_UNSIGNED:
case MEDIUMINT_UNSIGNED_ZEROFILL:
case BIGINT:
return DataTypes.BIGINT();
case BIGINT_UNSIGNED:
case BIGINT_UNSIGNED_ZEROFILL:
return DataTypes.DECIMAL(20, 0);
case FLOAT:
case FLOAT_UNSIGNED:
case FLOAT_UNSIGNED_ZEROFILL:
return DataTypes.FLOAT();
case DOUBLE:
case DOUBLE_UNSIGNED:
case DOUBLE_UNSIGNED_ZEROFILL:
return DataTypes.DOUBLE();
case DECIMAL:
return DataTypes.DECIMAL(column.length(), column.scale().orElse(0));
case TIME:
return column.length() >= 0 ? DataTypes.TIME(column.length()) : DataTypes.TIME();
case DATE:
return DataTypes.DATE();
case DATETIME:
case TIMESTAMP:
return column.length() >= 0
? DataTypes.TIMESTAMP(column.length())
: DataTypes.TIMESTAMP();
case CHAR:
return DataTypes.CHAR(column.length());
case VARCHAR:
return DataTypes.VARCHAR(column.length());
case TEXT:
return DataTypes.STRING();
case BINARY:
return DataTypes.BINARY(column.length());
case VARBINARY:
return DataTypes.VARBINARY(column.length());
case BLOB:
return DataTypes.BYTES();
default:
throw new UnsupportedOperationException(
String.format("Don't support MySQL type '%s' yet.", typeName));
}
}