in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/converter/BsonToRowDataConverters.java [304:333]
private static SerializableFunction<BsonValue, Object> createMapConverter(MapType mapType) {
LogicalType keyType = mapType.getKeyType();
checkArgument(keyType.supportsInputConversion(String.class));
LogicalType valueType = mapType.getValueType();
SerializableFunction<BsonValue, Object> valueConverter =
createNullSafeInternalConverter(valueType);
return new SerializableFunction<BsonValue, Object>() {
private static final long serialVersionUID = 1L;
@Override
public Object apply(BsonValue bsonValue) {
if (!bsonValue.isDocument()) {
throw new IllegalArgumentException(
"Unable to convert to rowType from unexpected value '"
+ bsonValue
+ "' of type "
+ bsonValue.getBsonType());
}
BsonDocument document = bsonValue.asDocument();
Map<StringData, Object> map = new HashMap<>();
for (String key : document.keySet()) {
map.put(StringData.fromString(key), valueConverter.apply(document.get(key)));
}
return new GenericMapData(map);
}
};
}