in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java [174:249]
public SchemaBuilder schemaBuilder(Column column) {
// Handle a few MySQL-specific types based upon how they are handled by the MySQL binlog
// client ...
String typeName = column.typeName().toUpperCase();
if (matches(typeName, "JSON")) {
return Json.builder();
}
if (matches(typeName, "POINT")) {
return io.debezium.data.geometry.Point.builder();
}
if (matches(typeName, "GEOMETRY")
|| matches(typeName, "LINESTRING")
|| matches(typeName, "POLYGON")
|| matches(typeName, "MULTIPOINT")
|| matches(typeName, "MULTILINESTRING")
|| matches(typeName, "MULTIPOLYGON")
|| isGeometryCollection(typeName)) {
return io.debezium.data.geometry.Geometry.builder();
}
if (matches(typeName, "YEAR")) {
return Year.builder();
}
if (matches(typeName, "ENUM")) {
String commaSeparatedOptions = extractEnumAndSetOptionsAsString(column);
return io.debezium.data.Enum.builder(commaSeparatedOptions);
}
if (matches(typeName, "SET")) {
String commaSeparatedOptions = extractEnumAndSetOptionsAsString(column);
return io.debezium.data.EnumSet.builder(commaSeparatedOptions);
}
if (matches(typeName, "SMALLINT UNSIGNED")
|| matches(typeName, "SMALLINT UNSIGNED ZEROFILL")
|| matches(typeName, "INT2 UNSIGNED")
|| matches(typeName, "INT2 UNSIGNED ZEROFILL")) {
// In order to capture unsigned SMALLINT 16-bit data source, INT32 will be required to
// safely capture all valid values
// Source:
// https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/data/Schema.Type.html
return SchemaBuilder.int32();
}
if (matches(typeName, "INT UNSIGNED")
|| matches(typeName, "INT UNSIGNED ZEROFILL")
|| matches(typeName, "INT4 UNSIGNED")
|| matches(typeName, "INT4 UNSIGNED ZEROFILL")) {
// In order to capture unsigned INT 32-bit data source, INT64 will be required to safely
// capture all valid values
// Source:
// https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/data/Schema.Type.html
return SchemaBuilder.int64();
}
if (matches(typeName, "BIGINT UNSIGNED")
|| matches(typeName, "BIGINT UNSIGNED ZEROFILL")
|| matches(typeName, "INT8 UNSIGNED")
|| matches(typeName, "INT8 UNSIGNED ZEROFILL")) {
switch (super.bigIntUnsignedMode) {
case LONG:
return SchemaBuilder.int64();
case PRECISE:
// In order to capture unsigned INT 64-bit data source,
// org.apache.kafka.connect.data.Decimal:Byte will be required to safely capture
// all valid values with scale of 0
// Source:
// https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/data/Schema.Type.html
return Decimal.builder(0);
}
}
if ((matches(typeName, "FLOAT")
|| matches(typeName, "FLOAT UNSIGNED")
|| matches(typeName, "FLOAT UNSIGNED ZEROFILL"))
&& !column.scale().isPresent()
&& column.length() <= 24) {
return SchemaBuilder.float32();
}
// Otherwise, let the base class handle it ...
return super.schemaBuilder(column);
}