in flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataAsyncLookupFunction.java [86:119]
public void eval(CompletableFuture<Collection<RowData>> future, Object... keys) throws IOException {
RowData keyRow = GenericRowData.of(keys);
if (cache != null) {
List<RowData> cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
future.complete(cachedRows);
return;
}
}
CompletableFuture<List<RowData>> resultFuture = lookupReader.asyncGet(keyRow);
resultFuture.handleAsync(
(resultRows, throwable) -> {
try {
if (throwable != null) {
future.completeExceptionally(throwable);
} else {
if (resultRows == null || resultRows.isEmpty()) {
if(cache != null){
cache.put(keyRow, Collections.emptyList());
}
future.complete(Collections.emptyList());
} else {
if(cache != null){
cache.put(keyRow, resultRows);
}
future.complete(resultRows);
}
}
} catch (Throwable t) {
future.completeExceptionally(t);
}
return null;
});
}