in flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/BinaryWriter.java [88:157]
static void write(
BinaryWriter writer, int pos, Object o, DataType type, TypeSerializer<?> serializer) {
switch (type.getTypeRoot()) {
case BOOLEAN:
writer.writeBoolean(pos, (boolean) o);
break;
case TINYINT:
writer.writeByte(pos, (byte) o);
break;
case SMALLINT:
writer.writeShort(pos, (short) o);
break;
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
writer.writeInt(pos, (int) o);
break;
case BIGINT:
writer.writeLong(pos, (long) o);
break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
TimestampType timestampType = (TimestampType) type;
writer.writeTimestamp(pos, (TimestampData) o, timestampType.getPrecision());
break;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
LocalZonedTimestampType lzTs = (LocalZonedTimestampType) type;
writer.writeLocalZonedTimestamp(
pos, (LocalZonedTimestampData) o, lzTs.getPrecision());
break;
case TIMESTAMP_WITH_TIME_ZONE:
ZonedTimestampType zTs = (ZonedTimestampType) type;
writer.writeZonedTimestamp(pos, (ZonedTimestampData) o, zTs.getPrecision());
break;
case FLOAT:
writer.writeFloat(pos, (float) o);
break;
case DOUBLE:
writer.writeDouble(pos, (double) o);
break;
case CHAR:
case VARCHAR:
writer.writeString(pos, (StringData) o);
break;
case DECIMAL:
DecimalType decimalType = (DecimalType) type;
writer.writeDecimal(pos, (DecimalData) o, decimalType.getPrecision());
break;
case ARRAY:
if (serializer instanceof NullableSerializerWrapper) {
serializer = ((NullableSerializerWrapper) serializer).getWrappedSerializer();
}
writer.writeArray(pos, (ArrayData) o, (ArrayDataSerializer) serializer);
break;
case MAP:
if (serializer instanceof NullableSerializerWrapper) {
serializer = ((NullableSerializerWrapper) serializer).getWrappedSerializer();
}
writer.writeMap(pos, (MapData) o, (MapDataSerializer) serializer);
break;
case ROW:
writer.writeRecord(pos, (RecordData) o, (TypeSerializer<RecordData>) serializer);
break;
case BINARY:
case VARBINARY:
writer.writeBinary(pos, (byte[]) o);
break;
default:
throw new UnsupportedOperationException("Not support type: " + type);
}
}