in flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java [146:163]
public TableSource<Row> applyPredicate(List<Expression> predicates) {
List<KuduFilterInfo> kuduPredicates = new ArrayList<>();
ListIterator<Expression> predicatesIter = predicates.listIterator();
while (predicatesIter.hasNext()) {
Expression predicate = predicatesIter.next();
Optional<KuduFilterInfo> kuduPred = toKuduFilterInfo(predicate);
if (kuduPred != null && kuduPred.isPresent()) {
LOG.debug("Predicate [{}] converted into KuduFilterInfo and pushed into " +
"KuduTable [{}].", predicate, tableInfo.getName());
kuduPredicates.add(kuduPred.get());
predicatesIter.remove();
} else {
LOG.debug("Predicate [{}] could not be pushed into KuduFilterInfo for KuduTable [{}].",
predicate, tableInfo.getName());
}
}
return new KuduTableSource(configBuilder, tableInfo, flinkSchema, kuduPredicates, projectedFields);
}