private static SerializableFunction createMapConverter()

in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/converter/BsonToRowDataConverters.java [304:333]


    private static SerializableFunction<BsonValue, Object> createMapConverter(MapType mapType) {
        LogicalType keyType = mapType.getKeyType();
        checkArgument(keyType.supportsInputConversion(String.class));

        LogicalType valueType = mapType.getValueType();
        SerializableFunction<BsonValue, Object> valueConverter =
                createNullSafeInternalConverter(valueType);

        return new SerializableFunction<BsonValue, Object>() {
            private static final long serialVersionUID = 1L;

            @Override
            public Object apply(BsonValue bsonValue) {
                if (!bsonValue.isDocument()) {
                    throw new IllegalArgumentException(
                            "Unable to convert to rowType from unexpected value '"
                                    + bsonValue
                                    + "' of type "
                                    + bsonValue.getBsonType());
                }

                BsonDocument document = bsonValue.asDocument();
                Map<StringData, Object> map = new HashMap<>();
                for (String key : document.keySet()) {
                    map.put(StringData.fromString(key), valueConverter.apply(document.get(key)));
                }
                return new GenericMapData(map);
            }
        };
    }