in flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/DataTypeSerializer.java [185:243]
public DataType deserialize(DataInputView source) throws IOException {
DataTypeClass dataTypeClass = enumSerializer.deserialize(source);
if (dataTypeClass == DataTypeClass.ROW) {
return getRowTypeSerializer().deserialize(source);
}
boolean isNullable = source.readBoolean();
switch (dataTypeClass) {
case BINARY:
int binaryLength = source.readInt();
return binaryLength == 0
? BinaryType.ofEmptyLiteral()
: new BinaryType(isNullable, binaryLength);
case ARRAY:
return new ArrayType(isNullable, this.deserialize(source));
case BOOLEAN:
return new BooleanType(isNullable);
case DECIMAL:
int precision = source.readInt();
int scale = source.readInt();
return new DecimalType(isNullable, precision, scale);
case LOCAL_ZONED_TIMESTAMP:
return new LocalZonedTimestampType(isNullable, source.readInt());
case VARBINARY:
return new VarBinaryType(isNullable, source.readInt());
case CHAR:
int charLength = source.readInt();
return charLength == 0
? CharType.ofEmptyLiteral()
: new CharType(isNullable, charLength);
case SMALLINT:
return new SmallIntType(isNullable);
case TIMESTAMP:
return new TimestampType(isNullable, source.readInt());
case INT:
return new IntType(isNullable);
case FLOAT:
return new FloatType(isNullable);
case MAP:
DataType keyType = this.deserialize(source);
DataType valType = this.deserialize(source);
return new MapType(isNullable, keyType, valType);
case TIME:
return new TimeType(isNullable, source.readInt());
case TINYINT:
return new TinyIntType(isNullable);
case VARCHAR:
return new VarCharType(isNullable, source.readInt());
case DATE:
return new DateType(isNullable);
case ZONED_TIMESTAMP:
return new ZonedTimestampType(isNullable, source.readInt());
case DOUBLE:
return new DoubleType(isNullable);
case BIGINT:
return new BigIntType(isNullable);
default:
throw new IllegalArgumentException("Unknown data type : " + dataTypeClass);
}
}