in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/converter/BsonToRowDataConverters.java [277:302]
private static SerializableFunction<BsonValue, Object> createArrayConverter(
ArrayType arrayType) {
final SerializableFunction<BsonValue, Object> elementConverter =
createNullSafeInternalConverter(arrayType.getElementType());
return new SerializableFunction<BsonValue, Object>() {
private static final long serialVersionUID = 1L;
@Override
public Object apply(BsonValue bsonValue) {
if (!bsonValue.isArray()) {
throw new IllegalArgumentException(
"Unable to convert to arrayType from unexpected value '"
+ bsonValue
+ "' of type "
+ bsonValue.getBsonType());
}
List<BsonValue> in = bsonValue.asArray();
final Object[] elementArray = new Object[in.size()];
for (int i = 0; i < in.size(); i++) {
elementArray[i] = elementConverter.apply(in.get(i));
}
return new GenericArrayData(elementArray);
}
};
}