in flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java [274:298]
public Result applyFilters(List<ResolvedExpression> filters) {
switch (filterHandlingPolicy) {
case NEVER:
return Result.of(Collections.emptyList(), filters);
case ALWAYS:
default:
List<ResolvedExpression> acceptedFilters = new ArrayList<>();
List<ResolvedExpression> remainingFilters = new ArrayList<>();
for (ResolvedExpression filter : filters) {
Optional<ParameterizedPredicate> simplePredicate =
parseFilterToPredicate(filter);
if (simplePredicate.isPresent()) {
acceptedFilters.add(filter);
ParameterizedPredicate pred = simplePredicate.get();
this.pushdownParams =
ArrayUtils.addAll(this.pushdownParams, pred.getParameters());
this.resolvedPredicates.add(pred.getPredicate());
} else {
remainingFilters.add(filter);
}
}
return Result.of(acceptedFilters, remainingFilters);
}
}