public ValueConverter converter()

in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/converter/OceanBaseValueConverters.java [240:317]


    public ValueConverter converter(Column column, Field fieldDefn) {
        switch (column.jdbcType()) {
            case Types.BIT:
                return convertBits(column, fieldDefn);
            case Types.TINYINT:
                if (column.length() == 1) {
                    return data -> convertBit(column, fieldDefn, data);
                }
                if (isUnsignedColumn(column)) {
                    return data -> convertSmallInt(column, fieldDefn, data);
                }
                return data -> convertTinyInt(column, fieldDefn, data);
            case Types.SMALLINT:
                if (isUnsignedColumn(column)) {
                    return data -> convertInteger(column, fieldDefn, data);
                }
                return data -> convertSmallInt(column, fieldDefn, data);
            case Types.INTEGER:
                if (column.typeName().toUpperCase().startsWith("MEDIUMINT")) {
                    return data -> convertInteger(column, fieldDefn, data);
                }
                if (isUnsignedColumn(column)) {
                    return data -> convertBigInt(column, fieldDefn, data);
                }
                return data -> convertInteger(column, fieldDefn, data);
            case Types.BIGINT:
                if (isUnsignedColumn(column)) {
                    switch (bigIntUnsignedMode) {
                        case LONG:
                            return (data) -> convertBigInt(column, fieldDefn, data);
                        case PRECISE:
                            return (data) -> convertUnsignedBigint(column, fieldDefn, data);
                    }
                }
                return (data) -> convertBigInt(column, fieldDefn, data);
            case Types.FLOAT:
                return data -> convertDecimal(column, fieldDefn, data);
            case Types.NUMERIC:
            case Types.DECIMAL:
                if ("mysql".equalsIgnoreCase(compatibleMode)) {
                    return data -> convertDecimal(column, fieldDefn, data);
                }
                return data -> convertNumeric(column, fieldDefn, data);
            case Types.REAL:
                return data -> convertReal(column, fieldDefn, data);
            case Types.DOUBLE:
                return data -> convertDouble(column, fieldDefn, data);
            case Types.DATE:
                if ("mysql".equalsIgnoreCase(compatibleMode)) {
                    if (column.typeName().equalsIgnoreCase("YEAR")) {
                        return (data) -> convertYearToInt(column, fieldDefn, data);
                    }
                    if (adaptiveTimePrecisionMode || adaptiveTimeMicrosecondsPrecisionMode) {
                        return (data) -> convertDateToEpochDays(column, fieldDefn, data);
                    }
                    return (data) -> convertDateToEpochDaysAsDate(column, fieldDefn, data);
                }
                return (data) -> convertTimestamp(column, fieldDefn, data);
            case Types.TIME:
                return (data) -> convertTime(column, fieldDefn, data);
            case Types.TIMESTAMP:
                return data -> convertTimestamp(column, fieldDefn, data);
            case Types.CHAR:
            case Types.VARCHAR:
            case Types.LONGVARCHAR:
            case Types.NCHAR:
            case Types.NVARCHAR:
            case Types.CLOB:
                return data -> convertString(column, fieldDefn, data);
            case Types.BINARY:
            case Types.VARBINARY:
            case Types.LONGVARBINARY:
            case Types.BLOB:
                return (data) -> convertBinary(column, fieldDefn, data, binaryMode);
            default:
                return super.converter(column, fieldDefn);
        }
    }