in flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSource.java [103:118]
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
if (CollectionUtils.isNotEmpty(this.filters)) {
for (ResolvedExpression filter : this.filters) {
Optional<KuduFilterInfo> kuduFilterInfo = KuduTableUtils.toKuduFilterInfo(filter);
if (kuduFilterInfo != null && kuduFilterInfo.isPresent()) {
this.predicates.add(kuduFilterInfo.get());
}
}
}
KuduRowDataInputFormat inputFormat = new KuduRowDataInputFormat(configBuilder.build(),
new RowResultRowDataConvertor(), tableInfo,
this.predicates,
projectedFields == null ? null : Lists.newArrayList(projectedFields));
return InputFormatProvider.of(inputFormat);
}