in fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlinkRowToFlussRowConverter.java [137:181]
private FlussSerializationConverter createInternalConverter(LogicalType flinkDataType) {
switch (flinkDataType.getTypeRoot()) {
case BOOLEAN:
case TINYINT:
case SMALLINT:
case INTEGER:
case BIGINT:
case FLOAT:
case DOUBLE:
case BINARY:
case VARBINARY:
return (flinkField) -> flinkField;
case CHAR:
case VARCHAR:
return (flinkField) -> {
StringData stringData = (StringData) flinkField;
return BinaryString.fromBytes(stringData.toBytes());
};
case DECIMAL:
return (flinkField) -> {
DecimalData decimalData = (DecimalData) flinkField;
return Decimal.fromBigDecimal(
decimalData.toBigDecimal(),
decimalData.precision(),
decimalData.scale());
};
case DATE:
case TIME_WITHOUT_TIME_ZONE:
return (flussField) -> flussField;
case TIMESTAMP_WITHOUT_TIME_ZONE:
return (flinkField) -> {
TimestampData timestampData = (TimestampData) flinkField;
return TimestampNtz.fromLocalDateTime(timestampData.toLocalDateTime());
};
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return (flinkField) -> {
TimestampData timestampData = (TimestampData) flinkField;
return TimestampLtz.fromEpochMillis(
timestampData.getMillisecond(), timestampData.getNanoOfMillisecond());
};
default:
throw new UnsupportedOperationException(
"Fluss Unsupported data type: " + flinkDataType);
}
}