private static SerializableFunction wrapIntoNullSafeInternalConverter()

in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/converter/RowDataToBsonConverters.java [103:124]


    private static SerializableFunction<Object, BsonValue> wrapIntoNullSafeInternalConverter(
            SerializableFunction<Object, BsonValue> internalConverter, LogicalType type) {
        return new SerializableFunction<Object, BsonValue>() {
            private static final long serialVersionUID = 1L;

            @Override
            public BsonValue apply(Object value) {
                if (value == null || LogicalTypeRoot.NULL.equals(type.getTypeRoot())) {
                    if (type.isNullable()) {
                        return BsonNull.VALUE;
                    } else {
                        throw new IllegalArgumentException(
                                "The column type is <"
                                        + type
                                        + ">, but a null value is being written into it");
                    }
                } else {
                    return internalConverter.apply(value);
                }
            }
        };
    }