in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/RowDataTiKVEventDeserializationSchemaBase.java [126:225]
public static TiKVDeserializationRuntimeConverter createNotNullConverter(LogicalType type) {
// if no matched user defined converter, fallback to the default converter
switch (type.getTypeRoot()) {
case NULL:
return new TiKVDeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(
Object object,
TiTableInfo schema,
org.tikv.common.types.DataType dataType) {
return null;
}
};
case BOOLEAN:
return convertToBoolean();
case TINYINT:
return new TiKVDeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(
Object object,
TiTableInfo schema,
org.tikv.common.types.DataType dataType) {
return Byte.parseByte(object.toString());
}
};
case SMALLINT:
return new TiKVDeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(
Object object,
TiTableInfo schema,
org.tikv.common.types.DataType dataType) {
return Short.parseShort(object.toString());
}
};
case INTEGER:
case INTERVAL_YEAR_MONTH:
return convertToInt();
case BIGINT:
case INTERVAL_DAY_TIME:
return convertToLong();
case DATE:
return convertToDate();
case TIME_WITHOUT_TIME_ZONE:
return convertToTime();
case TIMESTAMP_WITHOUT_TIME_ZONE:
return convertToTimestamp();
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return convertToLocalTimeZoneTimestamp();
case FLOAT:
return convertToFloat();
case DOUBLE:
return convertToDouble();
case CHAR:
case VARCHAR:
return convertToString();
case BINARY:
case VARBINARY:
return convertToBinary();
case DECIMAL:
return createDecimalConverter((DecimalType) type);
case ROW:
return createRowConverter((RowType) type);
case ARRAY:
return new TiKVDeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(
Object object,
TiTableInfo tableInfo,
org.tikv.common.types.DataType dataType)
throws Exception {
String[] strArray = ((String) object).split(",");
StringData[] stringDataArray = new StringData[strArray.length];
for (int i = 0; i < strArray.length; i++) {
stringDataArray[i] = StringData.fromString(strArray[i]);
}
return new GenericArrayData(stringDataArray);
}
};
case MAP:
case MULTISET:
case RAW:
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
}