protected StreamingServerDeserializationConverter createInternalConverter()

in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/utils/StreamingServerRowConverter.java [169:205]


    protected StreamingServerDeserializationConverter createInternalConverter(LogicalType type) {
        switch (type.getTypeRoot()) {
            case NULL:
                return val -> null;
            case BOOLEAN:
            case FLOAT:
            case DOUBLE:
            case INTERVAL_YEAR_MONTH:
            case INTERVAL_DAY_TIME:
                return val -> val;
            case TINYINT:
                return val -> ((Integer) val).byteValue();
            case SMALLINT:
                return val -> val instanceof Integer ? ((Integer) val).shortValue() : val;
            case INTEGER:
                return val -> val;
            case BIGINT:
                return val -> val;

            case TIME_WITHOUT_TIME_ZONE:
                return val -> (int) (((Time) val).toLocalTime().toNanoOfDay() / 1_000_000L);
            case TIMESTAMP_WITHOUT_TIME_ZONE:
                return val -> TimestampData.fromTimestamp((Timestamp) val);
            case DATE:
                // gRPC does not support data type, so we use string to represent date and cast to date type at server side
            case DECIMAL:
                // gRPC does not support decimal type, so we use string to represent date and cast to date type at server side
            case CHAR:
            case VARCHAR:
                return val -> StringData.fromString((String) val);
            case BINARY:
            case VARBINARY:
                return val -> (byte[]) val;
            default:
                throw new UnsupportedOperationException("Unsupported type:" + type);
        }
    }