in flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java [170:205]
public void run() {
LOG.info("action watcher start");
List<Get> recordList = new ArrayList<>(batchSize);
while (started.get()) {
try {
recordList.clear();
Get firstGet = queue.poll(2, TimeUnit.SECONDS);
if (firstGet != null) {
recordList.add(firstGet);
queue.drainTo(recordList, batchSize - 1);
LOG.debug("fetch {} records from queue", recordList.size());
Map<String, List<Get>> getsByTable = new HashMap<>();
for (Get get : recordList) {
List<Get> list =
getsByTable.computeIfAbsent(
get.getRecord().getTableIdentifier(),
(s) -> new ArrayList<>());
list.add(get);
}
for (Map.Entry<String, List<Get>> entry : getsByTable.entrySet()) {
GetAction getAction = new GetAction(entry.getValue());
while (!submit(getAction)) {}
}
}
} catch (InterruptedException e) {
break;
} catch (Exception e) {
for (Get get : recordList) {
if (!get.getFuture().isDone()) {
get.getFuture().completeExceptionally(e);
}
}
}
}
LOG.info("action watcher stop");
}