in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/converter/BsonToRowDataConverters.java [242:275]
private static SerializableFunction<BsonValue, Object> createRowConverter(RowType rowType) {
final SerializableFunction<BsonValue, Object>[] fieldConverters =
rowType.getFields().stream()
.map(RowType.RowField::getType)
.map(BsonToRowDataConverters::createNullSafeInternalConverter)
.toArray(SerializableFunction[]::new);
final int arity = rowType.getFieldCount();
final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
return new SerializableFunction<BsonValue, Object>() {
private static final long serialVersionUID = 1L;
@Override
public Object apply(BsonValue bsonValue) {
if (!bsonValue.isDocument()) {
throw new IllegalArgumentException(
"Unable to convert to rowType from unexpected value '"
+ bsonValue
+ "' of type "
+ bsonValue.getBsonType());
}
BsonDocument document = bsonValue.asDocument();
GenericRowData row = new GenericRowData(arity);
for (int i = 0; i < arity; i++) {
String fieldName = fieldNames[i];
BsonValue fieldValue = document.get(fieldName);
Object convertedField = fieldConverters[i].apply(fieldValue);
row.setField(i, convertedField);
}
return row;
}
};
}