in fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlussRowToFlinkRowConverter.java [112:152]
private FlussDeserializationConverter createInternalConverter(DataType flussDataType) {
switch (flussDataType.getTypeRoot()) {
case BOOLEAN:
case TINYINT:
case SMALLINT:
case INTEGER:
case BIGINT:
case FLOAT:
case DOUBLE:
return (flussField) -> flussField;
case CHAR:
case STRING:
return (flussField) -> StringData.fromBytes(((BinaryString) flussField).toBytes());
case BYTES:
case BINARY:
return (flussField) -> flussField;
case DECIMAL:
return (flussField) -> {
Decimal decimal = (Decimal) flussField;
return DecimalData.fromBigDecimal(
decimal.toBigDecimal(), decimal.precision(), decimal.scale());
};
case DATE:
case TIME_WITHOUT_TIME_ZONE:
return (flussField) -> flussField;
case TIMESTAMP_WITHOUT_TIME_ZONE:
return (flussField) -> {
TimestampNtz timestampNtz = (TimestampNtz) flussField;
return TimestampData.fromLocalDateTime(timestampNtz.toLocalDateTime());
};
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return (flussField) -> {
TimestampLtz timestampLtz = (TimestampLtz) flussField;
return TimestampData.fromEpochMillis(
timestampLtz.getEpochMillisecond(),
timestampLtz.getNanoOfMillisecond());
};
default:
throw new UnsupportedOperationException("Unsupported data type: " + flussDataType);
}
}