in flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/table/KuduDynamicTableSource.java [112:131]
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
if (CollectionUtils.isNotEmpty(filters)) {
for (ResolvedExpression filter : filters) {
Optional<KuduFilterInfo> kuduFilterInfo = KuduTableUtils.toKuduFilterInfo(filter);
if (kuduFilterInfo != null && kuduFilterInfo.isPresent()) {
predicates.add(kuduFilterInfo.get());
}
}
}
KuduRowDataInputFormat inputFormat =
new KuduRowDataInputFormat(
configBuilder.build(),
new RowResultRowDataConverter(),
tableInfo,
predicates,
DataType.getFieldNames(physicalRowDataType));
return InputFormatProvider.of(inputFormat);
}