private static SerializableFunction createRowConverter()

in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/converter/RowDataToBsonConverters.java [252:283]


    private static SerializableFunction<Object, BsonValue> createRowConverter(RowType rowType) {
        final SerializableFunction<Object, BsonValue>[] fieldConverters =
                rowType.getChildren().stream()
                        .map(RowDataToBsonConverters::createNullSafeInternalConverter)
                        .toArray(SerializableFunction[]::new);
        final LogicalType[] fieldTypes =
                rowType.getFields().stream()
                        .map(RowType.RowField::getType)
                        .toArray(LogicalType[]::new);

        final int fieldCount = rowType.getFieldCount();
        final RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[fieldCount];
        for (int i = 0; i < fieldCount; i++) {
            fieldGetters[i] = RowData.createFieldGetter(fieldTypes[i], i);
        }

        return new SerializableFunction<Object, BsonValue>() {
            private static final long serialVersionUID = 1L;

            @Override
            public BsonValue apply(Object value) {
                final RowData rowData = (RowData) value;
                final BsonDocument document = new BsonDocument();
                for (int i = 0; i < fieldCount; i++) {
                    String fieldName = rowType.getFieldNames().get(i);
                    Object fieldValue = fieldGetters[i].getFieldOrNull(rowData);
                    document.append(fieldName, fieldConverters[i].apply(fieldValue));
                }
                return document;
            }
        };
    }