public Object execute()

in hugegraph-core/src/main/java/org/apache/hugegraph/task/EphemeralJobQueue.java [129:186]


        public Object execute() throws Exception {
            boolean stop = false;
            Object result = null;
            int consumeCount = 0;
            InterruptedException interruptedException = null;
            EphemeralJobQueue queue;
            List<EphemeralJob<?>> batchJobs = new ArrayList<>();
            while (!stop) {
                if (interruptedException == null && Thread.currentThread().isInterrupted()) {
                    interruptedException = new InterruptedException();
                }

                queue = this.queueWeakReference.get();
                if (queue == null) {
                    stop = true;
                    continue;
                }

                if (queue.isEmpty() || consumeCount > MAX_CONSUME_COUNT ||
                    interruptedException != null) {
                    queue.consumeComplete();
                    stop = true;
                    if (!queue.isEmpty()) {
                        queue.reScheduleIfNeeded();
                    }
                    continue;
                }

                try {
                    while (!queue.isEmpty() && batchJobs.size() < PAGE_SIZE) {
                        EphemeralJob<?> job = queue.poll();
                        if (job == null) {
                            continue;
                        }
                        batchJobs.add(job);
                    }

                    if (batchJobs.isEmpty()) {
                        continue;
                    }

                    consumeCount += batchJobs.size();
                    result = this.executeBatchJob(batchJobs, result);

                } catch (InterruptedException e) {
                    interruptedException = e;
                } finally {
                    batchJobs.clear();
                }
            }

            if (interruptedException != null) {
                Thread.currentThread().interrupt();
                throw interruptedException;
            }

            return result;
        }