in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSource.java [205:236]
public Result applyFilters(List<ResolvedExpression> filters) {
switch (filterHandlingPolicy) {
case NEVER:
return Result.of(Collections.emptyList(), filters);
case ALWAYS:
default:
List<ResolvedExpression> acceptedFilters = new ArrayList<>();
List<ResolvedExpression> remainingFilters = new ArrayList<>();
List<Bson> mongoFilters = new ArrayList<>();
for (ResolvedExpression filter : filters) {
BsonDocument simpleFilter = parseFilter(filter);
if (simpleFilter.isEmpty()) {
remainingFilters.add(filter);
} else {
acceptedFilters.add(filter);
mongoFilters.add(simpleFilter);
}
}
if (!mongoFilters.isEmpty()) {
Bson mergedFilter =
mongoFilters.size() == 1
? mongoFilters.get(0)
: Filters.and(mongoFilters);
this.filter = mergedFilter.toBsonDocument();
LOG.info("Pushed down filters: {}", filter.toJson());
}
return Result.of(acceptedFilters, remainingFilters);
}
}