in flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataLookupFunction.java [91:134]
public void eval(Object... keys) {
RowData keyRow = GenericRowData.of(keys);
if (cache != null) {
List<RowData> cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (RowData cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}
List<PartitionDefinition> partitions = getPartitions(keys);
for (int retry = 0; retry <= maxRetryTimes; retry++) {
try {
ArrayList<RowData> rows = new ArrayList<>();
for (PartitionDefinition part : partitions) {
try (DorisValueReader valueReader = new DorisValueReader(part, options, readOptions)) {
while (valueReader.hasNext()) {
List<?> record = valueReader.next();
GenericRowData rowData = rowConverter.convertInternal(record);
rows.add(rowData);
collect(rowData);
}
}
}
if(cache != null){
rows.trimToSize();
cache.put(keyRow, rows);
}
break;
} catch (Exception ex) {
logger.error(String.format("Read Doris error, retry times = %d", retry), ex);
if (retry >= maxRetryTimes) {
throw new RuntimeException("Read Doris failed.", ex);
}
try {
Thread.sleep(1000 * retry);
} catch (InterruptedException e1) {
throw new RuntimeException(e1);
}
}
}
}