in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/converter/BsonToRowDataConverters.java [95:116]
private static SerializableFunction<BsonValue, Object> wrapIntoNullSafeInternalConverter(
SerializableFunction<BsonValue, Object> internalConverter, LogicalType type) {
return new SerializableFunction<BsonValue, Object>() {
private static final long serialVersionUID = 1L;
@Override
public Object apply(BsonValue bsonValue) {
if (isBsonValueNull(bsonValue) || isBsonDecimalNaN(bsonValue)) {
if (type.isNullable()) {
return null;
} else {
throw new IllegalArgumentException(
"Unable to convert to <"
+ type
+ "> from nullable value "
+ bsonValue);
}
}
return internalConverter.apply(bsonValue);
}
};
}