private FlussSerializationConverter createInternalConverter()

in fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlinkRowToFlussRowConverter.java [137:181]


    private FlussSerializationConverter createInternalConverter(LogicalType flinkDataType) {
        switch (flinkDataType.getTypeRoot()) {
            case BOOLEAN:
            case TINYINT:
            case SMALLINT:
            case INTEGER:
            case BIGINT:
            case FLOAT:
            case DOUBLE:
            case BINARY:
            case VARBINARY:
                return (flinkField) -> flinkField;
            case CHAR:
            case VARCHAR:
                return (flinkField) -> {
                    StringData stringData = (StringData) flinkField;
                    return BinaryString.fromBytes(stringData.toBytes());
                };
            case DECIMAL:
                return (flinkField) -> {
                    DecimalData decimalData = (DecimalData) flinkField;
                    return Decimal.fromBigDecimal(
                            decimalData.toBigDecimal(),
                            decimalData.precision(),
                            decimalData.scale());
                };
            case DATE:
            case TIME_WITHOUT_TIME_ZONE:
                return (flussField) -> flussField;
            case TIMESTAMP_WITHOUT_TIME_ZONE:
                return (flinkField) -> {
                    TimestampData timestampData = (TimestampData) flinkField;
                    return TimestampNtz.fromLocalDateTime(timestampData.toLocalDateTime());
                };
            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                return (flinkField) -> {
                    TimestampData timestampData = (TimestampData) flinkField;
                    return TimestampLtz.fromEpochMillis(
                            timestampData.getMillisecond(), timestampData.getNanoOfMillisecond());
                };
            default:
                throw new UnsupportedOperationException(
                        "Fluss Unsupported data type: " + flinkDataType);
        }
    }