private static SerializableFunction createMapConverter()

in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/converter/RowDataToBsonConverters.java [309:340]


    private static SerializableFunction<Object, BsonValue> createMapConverter(MapType mapType) {
        final LogicalType keyType = mapType.getKeyType();
        final LogicalType valueType = mapType.getValueType();
        if (!keyType.is(LogicalTypeFamily.CHARACTER_STRING)) {
            throw new UnsupportedOperationException(
                    "MongoDB doesn't support non-string as key type of map. "
                            + "The type is: "
                            + keyType.asSummaryString());
        }
        final SerializableFunction<Object, BsonValue> valueConverter =
                createNullSafeInternalConverter(valueType);
        final ArrayData.ElementGetter valueGetter = ArrayData.createElementGetter(valueType);

        return new SerializableFunction<Object, BsonValue>() {
            private static final long serialVersionUID = 1L;

            @Override
            public BsonValue apply(Object value) {
                final MapData mapData = (MapData) value;
                final ArrayData keyArray = mapData.keyArray();
                final ArrayData valueArray = mapData.valueArray();
                final BsonDocument document = new BsonDocument();
                for (int i = 0; i < mapData.size(); i++) {
                    final String key = keyArray.getString(i).toString();
                    final BsonValue bsonValue =
                            valueConverter.apply(valueGetter.getElementOrNull(valueArray, i));
                    document.append(key, bsonValue);
                }
                return document;
            }
        };
    }