in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/converter/RowDataToBsonConverters.java [103:124]
private static SerializableFunction<Object, BsonValue> wrapIntoNullSafeInternalConverter(
SerializableFunction<Object, BsonValue> internalConverter, LogicalType type) {
return new SerializableFunction<Object, BsonValue>() {
private static final long serialVersionUID = 1L;
@Override
public BsonValue apply(Object value) {
if (value == null || LogicalTypeRoot.NULL.equals(type.getTypeRoot())) {
if (type.isNullable()) {
return BsonNull.VALUE;
} else {
throw new IllegalArgumentException(
"The column type is <"
+ type
+ ">, but a null value is being written into it");
}
} else {
return internalConverter.apply(value);
}
}
};
}