fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlussRowToFlinkRowConverter.java [79:150]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    protected FlussDeserializationConverter createNullableInternalConverter(
            DataType flussDataType) {
        return wrapIntoNullableInternalConverter(createInternalConverter(flussDataType));
    }

    protected FlussDeserializationConverter wrapIntoNullableInternalConverter(
            FlussDeserializationConverter flussDeserializationConverter) {
        return val -> {
            if (val == null) {
                return null;
            } else {
                return flussDeserializationConverter.deserialize(val);
            }
        };
    }

    /**
     * Runtime converter to convert field in Fluss's {@link InternalRow} to Flink's {@link RowData}
     * type object.
     */
    @FunctionalInterface
    public interface FlussDeserializationConverter extends Serializable {

        /**
         * Convert a Fluss field object of {@link InternalRow} to the Flink's internal data
         * structure object.
         *
         * @param flussField A single field of a {@link InternalRow}
         */
        Object deserialize(Object flussField);
    }

    // TODO: use flink row type
    private FlussDeserializationConverter createInternalConverter(DataType flussDataType) {
        switch (flussDataType.getTypeRoot()) {
            case BOOLEAN:
            case TINYINT:
            case SMALLINT:
            case INTEGER:
            case BIGINT:
            case FLOAT:
            case DOUBLE:
                return (flussField) -> flussField;
            case CHAR:
            case STRING:
                return (flussField) -> StringData.fromBytes(((BinaryString) flussField).toBytes());
            case BYTES:
            case BINARY:
                return (flussField) -> flussField;
            case DECIMAL:
                return (flussField) -> {
                    Decimal decimal = (Decimal) flussField;
                    return DecimalData.fromBigDecimal(
                            decimal.toBigDecimal(), decimal.precision(), decimal.scale());
                };
            case DATE:
            case TIME_WITHOUT_TIME_ZONE:
                return (flussField) -> flussField;
            case TIMESTAMP_WITHOUT_TIME_ZONE:
                return (flussField) -> {
                    TimestampNtz timestampNtz = (TimestampNtz) flussField;
                    return TimestampData.fromLocalDateTime(timestampNtz.toLocalDateTime());
                };
            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                return (flussField) -> {
                    TimestampLtz timestampLtz = (TimestampLtz) flussField;
                    return TimestampData.fromEpochMillis(
                            timestampLtz.getEpochMillisecond(),
                            timestampLtz.getNanoOfMillisecond());
                };
            default:
                throw new UnsupportedOperationException("Unsupported data type: " + flussDataType);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



fluss-lakehouse/fluss-lakehouse-paimon/src/main/java/com/alibaba/fluss/lakehouse/paimon/source/utils/FlussRowToFlinkRowConverter.java [86:157]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    protected FlussDeserializationConverter createNullableInternalConverter(
            DataType flussDataType) {
        return wrapIntoNullableInternalConverter(createInternalConverter(flussDataType));
    }

    protected FlussDeserializationConverter wrapIntoNullableInternalConverter(
            FlussDeserializationConverter flussDeserializationConverter) {
        return val -> {
            if (val == null) {
                return null;
            } else {
                return flussDeserializationConverter.deserialize(val);
            }
        };
    }

    /**
     * Runtime converter to convert field in Fluss's {@link InternalRow} to Flink's {@link RowData}
     * type object.
     */
    @FunctionalInterface
    public interface FlussDeserializationConverter extends Serializable {

        /**
         * Convert a Fluss field object of {@link InternalRow} to the Flink's internal data
         * structure object.
         *
         * @param flussField A single field of a {@link InternalRow}
         */
        Object deserialize(Object flussField);
    }

    // TODO: use flink row type
    private FlussDeserializationConverter createInternalConverter(DataType flussDataType) {
        switch (flussDataType.getTypeRoot()) {
            case BOOLEAN:
            case TINYINT:
            case SMALLINT:
            case INTEGER:
            case BIGINT:
            case FLOAT:
            case DOUBLE:
                return (flussField) -> flussField;
            case CHAR:
            case STRING:
                return (flussField) -> StringData.fromBytes(((BinaryString) flussField).toBytes());
            case BYTES:
            case BINARY:
                return (flussField) -> flussField;
            case DECIMAL:
                return (flussField) -> {
                    Decimal decimal = (Decimal) flussField;
                    return DecimalData.fromBigDecimal(
                            decimal.toBigDecimal(), decimal.precision(), decimal.scale());
                };
            case DATE:
            case TIME_WITHOUT_TIME_ZONE:
                return (flussField) -> flussField;
            case TIMESTAMP_WITHOUT_TIME_ZONE:
                return (flussField) -> {
                    TimestampNtz timestampNtz = (TimestampNtz) flussField;
                    return TimestampData.fromLocalDateTime(timestampNtz.toLocalDateTime());
                };
            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                return (flussField) -> {
                    TimestampLtz timestampLtz = (TimestampLtz) flussField;
                    return TimestampData.fromEpochMillis(
                            timestampLtz.getEpochMillisecond(),
                            timestampLtz.getNanoOfMillisecond());
                };
            default:
                throw new UnsupportedOperationException("Unsupported data type: " + flussDataType);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



