public SchemaBuilder schemaBuilder()

in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/converter/OceanBaseValueConverters.java [108:201]


    public SchemaBuilder schemaBuilder(Column column) {
        logger.debug(
                "Building schema for column {} of type {} named {} with constraints ({},{})",
                column.name(),
                column.jdbcType(),
                column.typeName(),
                column.length(),
                column.scale());

        switch (column.jdbcType()) {
            case Types.BIT:
                if (column.length() > 1) {
                    return Bits.builder(column.length());
                }
                return SchemaBuilder.bool();
            case Types.TINYINT:
                if (column.length() == 1) {
                    return SchemaBuilder.bool();
                }
                if (isUnsignedColumn(column)) {
                    return SchemaBuilder.int16();
                }
                return SchemaBuilder.int8();
            case Types.SMALLINT:
                if (isUnsignedColumn(column)) {
                    return SchemaBuilder.int32();
                }
                return SchemaBuilder.int16();
            case Types.INTEGER:
                if (!column.typeName().toUpperCase().startsWith("MEDIUMINT")
                        && isUnsignedColumn(column)) {
                    return SchemaBuilder.int64();
                }
                return SchemaBuilder.int32();
            case Types.BIGINT:
                if (isUnsignedColumn(column)) {
                    return Decimal.builder(0);
                }
                return SchemaBuilder.int64();
            case Types.FLOAT:
                return getDecimalSchema(column);
            case Types.NUMERIC:
            case Types.DECIMAL:
                if ("mysql".equalsIgnoreCase(compatibleMode)) {
                    return getDecimalSchema(column);
                }
                return getNumericSchema(column);
            case Types.REAL:
                return SchemaBuilder.float32();
            case Types.DOUBLE:
                return SchemaBuilder.float64();
            case Types.DATE:
                if ("mysql".equalsIgnoreCase(compatibleMode)) {
                    if (column.typeName().equalsIgnoreCase("YEAR")) {
                        return io.debezium.time.Year.builder();
                    }
                    if (adaptiveTimePrecisionMode || adaptiveTimeMicrosecondsPrecisionMode) {
                        return io.debezium.time.Date.builder();
                    }
                    return org.apache.kafka.connect.data.Date.builder();
                }
                return getTimestampSchema(column);
            case Types.TIME:
                if (adaptiveTimeMicrosecondsPrecisionMode) {
                    return io.debezium.time.MicroTime.builder();
                }
                if (adaptiveTimePrecisionMode) {
                    if (getTimePrecision(column) <= 3) {
                        return io.debezium.time.Time.builder();
                    }
                    if (getTimePrecision(column) <= 6) {
                        return io.debezium.time.MicroTime.builder();
                    }
                    return io.debezium.time.NanoTime.builder();
                }
                return org.apache.kafka.connect.data.Time.builder();
            case Types.TIMESTAMP:
                return getTimestampSchema(column);
            case Types.CHAR:
            case Types.VARCHAR:
            case Types.LONGVARCHAR:
            case Types.NCHAR:
            case Types.NVARCHAR:
            case Types.CLOB:
                return SchemaBuilder.string();
            case Types.BINARY:
            case Types.VARBINARY:
            case Types.LONGVARBINARY:
            case Types.BLOB:
                return binaryMode.getSchema();
            default:
                return super.schemaBuilder(column);
        }
    }