in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/converter/OceanBaseValueConverters.java [240:317]
public ValueConverter converter(Column column, Field fieldDefn) {
switch (column.jdbcType()) {
case Types.BIT:
return convertBits(column, fieldDefn);
case Types.TINYINT:
if (column.length() == 1) {
return data -> convertBit(column, fieldDefn, data);
}
if (isUnsignedColumn(column)) {
return data -> convertSmallInt(column, fieldDefn, data);
}
return data -> convertTinyInt(column, fieldDefn, data);
case Types.SMALLINT:
if (isUnsignedColumn(column)) {
return data -> convertInteger(column, fieldDefn, data);
}
return data -> convertSmallInt(column, fieldDefn, data);
case Types.INTEGER:
if (column.typeName().toUpperCase().startsWith("MEDIUMINT")) {
return data -> convertInteger(column, fieldDefn, data);
}
if (isUnsignedColumn(column)) {
return data -> convertBigInt(column, fieldDefn, data);
}
return data -> convertInteger(column, fieldDefn, data);
case Types.BIGINT:
if (isUnsignedColumn(column)) {
switch (bigIntUnsignedMode) {
case LONG:
return (data) -> convertBigInt(column, fieldDefn, data);
case PRECISE:
return (data) -> convertUnsignedBigint(column, fieldDefn, data);
}
}
return (data) -> convertBigInt(column, fieldDefn, data);
case Types.FLOAT:
return data -> convertDecimal(column, fieldDefn, data);
case Types.NUMERIC:
case Types.DECIMAL:
if ("mysql".equalsIgnoreCase(compatibleMode)) {
return data -> convertDecimal(column, fieldDefn, data);
}
return data -> convertNumeric(column, fieldDefn, data);
case Types.REAL:
return data -> convertReal(column, fieldDefn, data);
case Types.DOUBLE:
return data -> convertDouble(column, fieldDefn, data);
case Types.DATE:
if ("mysql".equalsIgnoreCase(compatibleMode)) {
if (column.typeName().equalsIgnoreCase("YEAR")) {
return (data) -> convertYearToInt(column, fieldDefn, data);
}
if (adaptiveTimePrecisionMode || adaptiveTimeMicrosecondsPrecisionMode) {
return (data) -> convertDateToEpochDays(column, fieldDefn, data);
}
return (data) -> convertDateToEpochDaysAsDate(column, fieldDefn, data);
}
return (data) -> convertTimestamp(column, fieldDefn, data);
case Types.TIME:
return (data) -> convertTime(column, fieldDefn, data);
case Types.TIMESTAMP:
return data -> convertTimestamp(column, fieldDefn, data);
case Types.CHAR:
case Types.VARCHAR:
case Types.LONGVARCHAR:
case Types.NCHAR:
case Types.NVARCHAR:
case Types.CLOB:
return data -> convertString(column, fieldDefn, data);
case Types.BINARY:
case Types.VARBINARY:
case Types.LONGVARBINARY:
case Types.BLOB:
return (data) -> convertBinary(column, fieldDefn, data, binaryMode);
default:
return super.converter(column, fieldDefn);
}
}