in flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataAsyncLookupFunction.java [100:143]
public void eval(CompletableFuture<Collection<RowData>> future, Object... keys)
throws IOException {
RowData keyRow = GenericRowData.of(keys);
if (cache != null) {
List<RowData> cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
lookupMetrics.incHitCount();
if (LOG.isDebugEnabled()) {
LOG.debug("lookup cache hit for key: {}", keyRow);
}
future.complete(cachedRows);
return;
} else {
lookupMetrics.incMissCount();
}
}
CompletableFuture<List<RowData>> resultFuture = lookupReader.asyncGet(keyRow);
resultFuture.handleAsync(
(resultRows, throwable) -> {
try {
if (throwable != null) {
future.completeExceptionally(throwable);
} else {
if (resultRows == null || resultRows.isEmpty()) {
if (cache != null) {
cache.put(keyRow, Collections.emptyList());
lookupMetrics.incLoadCount();
}
future.complete(Collections.emptyList());
} else {
if (cache != null) {
cache.put(keyRow, resultRows);
lookupMetrics.incLoadCount();
}
future.complete(resultRows);
}
}
} catch (Throwable t) {
future.completeExceptionally(t);
}
return null;
});
}