public void run()

in flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java [170:205]


        public void run() {
            LOG.info("action watcher start");
            List<Get> recordList = new ArrayList<>(batchSize);
            while (started.get()) {
                try {
                    recordList.clear();
                    Get firstGet = queue.poll(2, TimeUnit.SECONDS);
                    if (firstGet != null) {
                        recordList.add(firstGet);
                        queue.drainTo(recordList, batchSize - 1);
                        LOG.debug("fetch {} records from queue", recordList.size());
                        Map<String, List<Get>> getsByTable = new HashMap<>();
                        for (Get get : recordList) {
                            List<Get> list =
                                    getsByTable.computeIfAbsent(
                                            get.getRecord().getTableIdentifier(),
                                            (s) -> new ArrayList<>());
                            list.add(get);
                        }
                        for (Map.Entry<String, List<Get>> entry : getsByTable.entrySet()) {
                            GetAction getAction = new GetAction(entry.getValue());
                            while (!submit(getAction)) {}
                        }
                    }
                } catch (InterruptedException e) {
                    break;
                } catch (Exception e) {
                    for (Get get : recordList) {
                        if (!get.getFuture().isDone()) {
                            get.getFuture().completeExceptionally(e);
                        }
                    }
                }
            }
            LOG.info("action watcher stop");
        }