in flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java [86:140]
public void eval(Object... keys) {
if (keys.length != keyNames.length) {
throw new RuntimeException("The join keys are of unequal lengths");
}
// cache key
RowData keyRow = buildCacheKey(keys);
if (this.cache != null) {
List<RowData> cacheRows = this.cache.getIfPresent(keyRow);
if (CollectionUtils.isNotEmpty(cacheRows)) {
for (RowData cacheRow : cacheRows) {
collect(cacheRow);
}
return;
}
}
for (int retry = 1; retry <= maxRetryTimes; retry++) {
try {
List<KuduFilterInfo> kuduFilterInfos = buildKuduFilterInfo(keys);
this.kuduReader.setTableFilters(kuduFilterInfos);
KuduInputSplit[] inputSplits = kuduReader.createInputSplits(1);
ArrayList<RowData> rows = new ArrayList<>();
for (KuduInputSplit inputSplit : inputSplits) {
KuduReaderIterator<RowData> scanner = kuduReader.scanner(inputSplit.getScanToken());
// not use cache
if (cache == null) {
while (scanner.hasNext()) {
collect(scanner.next());
}
} else {
while (scanner.hasNext()) {
RowData row = scanner.next();
rows.add(row);
collect(row);
}
rows.trimToSize();
}
}
if (cache != null) {
cache.put(keyRow, rows);
}
break;
} catch (Exception e) {
LOG.error(String.format("Kudu scan error, retry times = %d", retry), e);
if (retry >= maxRetryTimes) {
throw new RuntimeException("Execution of Kudu scan failed.", e);
}
try {
Thread.sleep(1000L * retry);
} catch (InterruptedException e1) {
throw new RuntimeException(e1);
}
}
}
}