public Result applyFilters()

in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSource.java [205:236]


    public Result applyFilters(List<ResolvedExpression> filters) {
        switch (filterHandlingPolicy) {
            case NEVER:
                return Result.of(Collections.emptyList(), filters);
            case ALWAYS:
            default:
                List<ResolvedExpression> acceptedFilters = new ArrayList<>();
                List<ResolvedExpression> remainingFilters = new ArrayList<>();

                List<Bson> mongoFilters = new ArrayList<>();
                for (ResolvedExpression filter : filters) {
                    BsonDocument simpleFilter = parseFilter(filter);
                    if (simpleFilter.isEmpty()) {
                        remainingFilters.add(filter);
                    } else {
                        acceptedFilters.add(filter);
                        mongoFilters.add(simpleFilter);
                    }
                }

                if (!mongoFilters.isEmpty()) {
                    Bson mergedFilter =
                            mongoFilters.size() == 1
                                    ? mongoFilters.get(0)
                                    : Filters.and(mongoFilters);
                    this.filter = mergedFilter.toBsonDocument();
                    LOG.info("Pushed down filters: {}", filter.toJson());
                }

                return Result.of(acceptedFilters, remainingFilters);
        }
    }