public SchemaBuilder schemaBuilder()

in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java [174:249]


    public SchemaBuilder schemaBuilder(Column column) {
        // Handle a few MySQL-specific types based upon how they are handled by the MySQL binlog
        // client ...
        String typeName = column.typeName().toUpperCase();
        if (matches(typeName, "JSON")) {
            return Json.builder();
        }
        if (matches(typeName, "POINT")) {
            return io.debezium.data.geometry.Point.builder();
        }
        if (matches(typeName, "GEOMETRY")
                || matches(typeName, "LINESTRING")
                || matches(typeName, "POLYGON")
                || matches(typeName, "MULTIPOINT")
                || matches(typeName, "MULTILINESTRING")
                || matches(typeName, "MULTIPOLYGON")
                || isGeometryCollection(typeName)) {
            return io.debezium.data.geometry.Geometry.builder();
        }
        if (matches(typeName, "YEAR")) {
            return Year.builder();
        }
        if (matches(typeName, "ENUM")) {
            String commaSeparatedOptions = extractEnumAndSetOptionsAsString(column);
            return io.debezium.data.Enum.builder(commaSeparatedOptions);
        }
        if (matches(typeName, "SET")) {
            String commaSeparatedOptions = extractEnumAndSetOptionsAsString(column);
            return io.debezium.data.EnumSet.builder(commaSeparatedOptions);
        }
        if (matches(typeName, "SMALLINT UNSIGNED")
                || matches(typeName, "SMALLINT UNSIGNED ZEROFILL")
                || matches(typeName, "INT2 UNSIGNED")
                || matches(typeName, "INT2 UNSIGNED ZEROFILL")) {
            // In order to capture unsigned SMALLINT 16-bit data source, INT32 will be required to
            // safely capture all valid values
            // Source:
            // https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/data/Schema.Type.html
            return SchemaBuilder.int32();
        }
        if (matches(typeName, "INT UNSIGNED")
                || matches(typeName, "INT UNSIGNED ZEROFILL")
                || matches(typeName, "INT4 UNSIGNED")
                || matches(typeName, "INT4 UNSIGNED ZEROFILL")) {
            // In order to capture unsigned INT 32-bit data source, INT64 will be required to safely
            // capture all valid values
            // Source:
            // https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/data/Schema.Type.html
            return SchemaBuilder.int64();
        }
        if (matches(typeName, "BIGINT UNSIGNED")
                || matches(typeName, "BIGINT UNSIGNED ZEROFILL")
                || matches(typeName, "INT8 UNSIGNED")
                || matches(typeName, "INT8 UNSIGNED ZEROFILL")) {
            switch (super.bigIntUnsignedMode) {
                case LONG:
                    return SchemaBuilder.int64();
                case PRECISE:
                    // In order to capture unsigned INT 64-bit data source,
                    // org.apache.kafka.connect.data.Decimal:Byte will be required to safely capture
                    // all valid values with scale of 0
                    // Source:
                    // https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/data/Schema.Type.html
                    return Decimal.builder(0);
            }
        }
        if ((matches(typeName, "FLOAT")
                        || matches(typeName, "FLOAT UNSIGNED")
                        || matches(typeName, "FLOAT UNSIGNED ZEROFILL"))
                && !column.scale().isPresent()
                && column.length() <= 24) {
            return SchemaBuilder.float32();
        }
        // Otherwise, let the base class handle it ...
        return super.schemaBuilder(column);
    }