in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/converter/RowDataToBsonConverters.java [252:283]
private static SerializableFunction<Object, BsonValue> createRowConverter(RowType rowType) {
final SerializableFunction<Object, BsonValue>[] fieldConverters =
rowType.getChildren().stream()
.map(RowDataToBsonConverters::createNullSafeInternalConverter)
.toArray(SerializableFunction[]::new);
final LogicalType[] fieldTypes =
rowType.getFields().stream()
.map(RowType.RowField::getType)
.toArray(LogicalType[]::new);
final int fieldCount = rowType.getFieldCount();
final RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[fieldCount];
for (int i = 0; i < fieldCount; i++) {
fieldGetters[i] = RowData.createFieldGetter(fieldTypes[i], i);
}
return new SerializableFunction<Object, BsonValue>() {
private static final long serialVersionUID = 1L;
@Override
public BsonValue apply(Object value) {
final RowData rowData = (RowData) value;
final BsonDocument document = new BsonDocument();
for (int i = 0; i < fieldCount; i++) {
String fieldName = rowType.getFieldNames().get(i);
Object fieldValue = fieldGetters[i].getFieldOrNull(rowData);
document.append(fieldName, fieldConverters[i].apply(fieldValue));
}
return document;
}
};
}