in inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/table/RowDataDebeziumDeserializeSchema.java [534:630]
public DeserializationRuntimeConverter createNotNullConverter(
LogicalType type,
ZoneId serverTimeZone,
DeserializationRuntimeConverterFactory userDefinedConverterFactory) {
// user defined converter has a higher resolve order
Optional<DeserializationRuntimeConverter> converter =
userDefinedConverterFactory.createUserDefinedConverter(type, serverTimeZone);
if (converter.isPresent()) {
return converter.get();
}
// if no matched user defined converter, fallback to the default converter
switch (type.getTypeRoot()) {
case NULL:
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
return null;
}
@Override
public Object convert(Object dbzObj, Schema schema, TableChange tableSchema) throws Exception {
return convert(dbzObj, schema);
}
};
case BOOLEAN:
return convertToBoolean();
case TINYINT:
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
return Byte.parseByte(dbzObj.toString());
}
@Override
public Object convert(Object dbzObj, Schema schema, TableChange tableSchema) throws Exception {
return convert(dbzObj, schema);
}
};
case SMALLINT:
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
return Short.parseShort(dbzObj.toString());
}
@Override
public Object convert(Object dbzObj, Schema schema, TableChange tableSchema) throws Exception {
return convert(dbzObj, schema);
}
};
case INTEGER:
case INTERVAL_YEAR_MONTH:
return convertToInt();
case BIGINT:
case INTERVAL_DAY_TIME:
return convertToLong();
case DATE:
return convertToDate();
case TIME_WITHOUT_TIME_ZONE:
return convertToTime();
case TIMESTAMP_WITHOUT_TIME_ZONE:
return convertToTimestamp(serverTimeZone);
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return convertToLocalTimeZoneTimestamp(serverTimeZone);
case FLOAT:
return convertToFloat();
case DOUBLE:
return convertToDouble();
case CHAR:
case VARCHAR:
return convertToString();
case BINARY:
case VARBINARY:
return convertToBinary();
case DECIMAL:
return createDecimalConverter((DecimalType) type);
case ROW:
return createRowConverter(
(RowType) type, serverTimeZone, userDefinedConverterFactory);
case ARRAY:
case MAP:
case MULTISET:
case RAW:
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
}