in flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/table/function/lookup/KuduRowDataLookupFunction.java [83:114]
public Collection<RowData> lookup(RowData keyRow) {
for (int retry = 1; retry <= maxRetryTimes; retry++) {
try {
List<KuduFilterInfo> kuduFilterInfos = buildKuduFilterInfo((GenericRowData) keyRow);
this.kuduReader.setTableFilters(kuduFilterInfos);
KuduInputSplit[] inputSplits = kuduReader.createInputSplits(1);
ArrayList<RowData> rows = new ArrayList<>();
for (KuduInputSplit inputSplit : inputSplits) {
KuduReaderIterator<RowData> scanner =
kuduReader.scanner(inputSplit.getScanToken());
while (scanner.hasNext()) {
RowData row = scanner.next();
rows.add(row);
}
}
rows.trimToSize();
return rows;
} catch (Exception e) {
LOG.error(String.format("Kudu scan error, retry times = %d", retry), e);
if (retry >= maxRetryTimes) {
throw new RuntimeException("Execution of Kudu scan failed.", e);
}
try {
Thread.sleep(1000L * retry);
} catch (InterruptedException e1) {
throw new RuntimeException(e1);
}
}
}
return Collections.emptyList();
}