public void eval()

in flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataLookupFunction.java [91:134]


    public void eval(Object... keys) {
        RowData keyRow = GenericRowData.of(keys);
        if (cache != null) {
            List<RowData> cachedRows = cache.getIfPresent(keyRow);
            if (cachedRows != null) {
                for (RowData cachedRow : cachedRows) {
                    collect(cachedRow);
                }
                return;
            }
        }

        List<PartitionDefinition> partitions = getPartitions(keys);
        for (int retry = 0; retry <= maxRetryTimes; retry++) {
            try {
                ArrayList<RowData> rows = new ArrayList<>();
                for (PartitionDefinition part : partitions) {
                    try (DorisValueReader valueReader = new DorisValueReader(part, options, readOptions)) {
                        while (valueReader.hasNext()) {
                            List<?> record = valueReader.next();
                            GenericRowData rowData = rowConverter.convertInternal(record);
                            rows.add(rowData);
                            collect(rowData);
                        }
                    }
                }
                if(cache != null){
                    rows.trimToSize();
                    cache.put(keyRow, rows);
                }
                break;
            } catch (Exception ex) {
                logger.error(String.format("Read Doris error, retry times = %d", retry), ex);
                if (retry >= maxRetryTimes) {
                    throw new RuntimeException("Read Doris failed.", ex);
                }
                try {
                    Thread.sleep(1000 * retry);
                } catch (InterruptedException e1) {
                    throw new RuntimeException(e1);
                }
            }
        }
    }