private FlussDeserializationConverter createInternalConverter()

in fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlussRowToFlinkRowConverter.java [112:152]


    private FlussDeserializationConverter createInternalConverter(DataType flussDataType) {
        switch (flussDataType.getTypeRoot()) {
            case BOOLEAN:
            case TINYINT:
            case SMALLINT:
            case INTEGER:
            case BIGINT:
            case FLOAT:
            case DOUBLE:
                return (flussField) -> flussField;
            case CHAR:
            case STRING:
                return (flussField) -> StringData.fromBytes(((BinaryString) flussField).toBytes());
            case BYTES:
            case BINARY:
                return (flussField) -> flussField;
            case DECIMAL:
                return (flussField) -> {
                    Decimal decimal = (Decimal) flussField;
                    return DecimalData.fromBigDecimal(
                            decimal.toBigDecimal(), decimal.precision(), decimal.scale());
                };
            case DATE:
            case TIME_WITHOUT_TIME_ZONE:
                return (flussField) -> flussField;
            case TIMESTAMP_WITHOUT_TIME_ZONE:
                return (flussField) -> {
                    TimestampNtz timestampNtz = (TimestampNtz) flussField;
                    return TimestampData.fromLocalDateTime(timestampNtz.toLocalDateTime());
                };
            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                return (flussField) -> {
                    TimestampLtz timestampLtz = (TimestampLtz) flussField;
                    return TimestampData.fromEpochMillis(
                            timestampLtz.getEpochMillisecond(),
                            timestampLtz.getNanoOfMillisecond());
                };
            default:
                throw new UnsupportedOperationException("Unsupported data type: " + flussDataType);
        }
    }