in flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/InternalSerializers.java [46:90]
private static TypeSerializer<?> createInternal(DataType type) {
// ordered by type root definition
switch (type.getTypeRoot()) {
case CHAR:
case VARCHAR:
return StringDataSerializer.INSTANCE;
case BOOLEAN:
return BooleanSerializer.INSTANCE;
case BINARY:
case VARBINARY:
return BytePrimitiveArraySerializer.INSTANCE;
case DECIMAL:
return new DecimalDataSerializer(getPrecision(type), getScale(type));
case TINYINT:
return ByteSerializer.INSTANCE;
case SMALLINT:
return ShortSerializer.INSTANCE;
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
return IntSerializer.INSTANCE;
case BIGINT:
return LongSerializer.INSTANCE;
case FLOAT:
return FloatSerializer.INSTANCE;
case DOUBLE:
return DoubleSerializer.INSTANCE;
case TIMESTAMP_WITHOUT_TIME_ZONE:
return new TimestampDataSerializer(getPrecision(type));
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return new LocalZonedTimestampDataSerializer(getPrecision(type));
case TIMESTAMP_WITH_TIME_ZONE:
return new ZonedTimestampDataSerializer(getPrecision(type));
case ARRAY:
return new ArrayDataSerializer(((ArrayType) type).getElementType());
case ROW:
return new RecordDataSerializer();
case MAP:
MapType mapType = (MapType) type;
return new MapDataSerializer(mapType.getKeyType(), mapType.getValueType());
default:
throw new UnsupportedOperationException(
"Unsupported type '" + type + "' to get internal serializer");
}
}