in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/converter/RowDataToBsonConverters.java [309:340]
private static SerializableFunction<Object, BsonValue> createMapConverter(MapType mapType) {
final LogicalType keyType = mapType.getKeyType();
final LogicalType valueType = mapType.getValueType();
if (!keyType.is(LogicalTypeFamily.CHARACTER_STRING)) {
throw new UnsupportedOperationException(
"MongoDB doesn't support non-string as key type of map. "
+ "The type is: "
+ keyType.asSummaryString());
}
final SerializableFunction<Object, BsonValue> valueConverter =
createNullSafeInternalConverter(valueType);
final ArrayData.ElementGetter valueGetter = ArrayData.createElementGetter(valueType);
return new SerializableFunction<Object, BsonValue>() {
private static final long serialVersionUID = 1L;
@Override
public BsonValue apply(Object value) {
final MapData mapData = (MapData) value;
final ArrayData keyArray = mapData.keyArray();
final ArrayData valueArray = mapData.valueArray();
final BsonDocument document = new BsonDocument();
for (int i = 0; i < mapData.size(); i++) {
final String key = keyArray.getString(i).toString();
final BsonValue bsonValue =
valueConverter.apply(valueGetter.getElementOrNull(valueArray, i));
document.append(key, bsonValue);
}
return document;
}
};
}