private static SerializableFunction createRowConverter()

in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/converter/BsonToRowDataConverters.java [242:275]


    private static SerializableFunction<BsonValue, Object> createRowConverter(RowType rowType) {
        final SerializableFunction<BsonValue, Object>[] fieldConverters =
                rowType.getFields().stream()
                        .map(RowType.RowField::getType)
                        .map(BsonToRowDataConverters::createNullSafeInternalConverter)
                        .toArray(SerializableFunction[]::new);
        final int arity = rowType.getFieldCount();
        final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);

        return new SerializableFunction<BsonValue, Object>() {
            private static final long serialVersionUID = 1L;

            @Override
            public Object apply(BsonValue bsonValue) {
                if (!bsonValue.isDocument()) {
                    throw new IllegalArgumentException(
                            "Unable to convert to rowType from unexpected value '"
                                    + bsonValue
                                    + "' of type "
                                    + bsonValue.getBsonType());
                }

                BsonDocument document = bsonValue.asDocument();
                GenericRowData row = new GenericRowData(arity);
                for (int i = 0; i < arity; i++) {
                    String fieldName = fieldNames[i];
                    BsonValue fieldValue = document.get(fieldName);
                    Object convertedField = fieldConverters[i].apply(fieldValue);
                    row.setField(i, convertedField);
                }
                return row;
            }
        };
    }