public Collection lookup()

in flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/table/function/lookup/KuduRowDataLookupFunction.java [83:114]


    public Collection<RowData> lookup(RowData keyRow) {
        for (int retry = 1; retry <= maxRetryTimes; retry++) {
            try {
                List<KuduFilterInfo> kuduFilterInfos = buildKuduFilterInfo((GenericRowData) keyRow);
                this.kuduReader.setTableFilters(kuduFilterInfos);
                KuduInputSplit[] inputSplits = kuduReader.createInputSplits(1);
                ArrayList<RowData> rows = new ArrayList<>();
                for (KuduInputSplit inputSplit : inputSplits) {
                    KuduReaderIterator<RowData> scanner =
                            kuduReader.scanner(inputSplit.getScanToken());
                    while (scanner.hasNext()) {
                        RowData row = scanner.next();
                        rows.add(row);
                    }
                }
                rows.trimToSize();
                return rows;
            } catch (Exception e) {
                LOG.error(String.format("Kudu scan error, retry times = %d", retry), e);
                if (retry >= maxRetryTimes) {
                    throw new RuntimeException("Execution of Kudu scan failed.", e);
                }
                try {
                    Thread.sleep(1000L * retry);
                } catch (InterruptedException e1) {
                    throw new RuntimeException(e1);
                }
            }
        }

        return Collections.emptyList();
    }