in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/EphemeralJobQueue.java [129:186]
public Object execute() throws Exception {
boolean stop = false;
Object result = null;
int consumeCount = 0;
InterruptedException interruptedException = null;
EphemeralJobQueue queue;
List<EphemeralJob<?>> batchJobs = new ArrayList<>();
while (!stop) {
if (interruptedException == null && Thread.currentThread().isInterrupted()) {
interruptedException = new InterruptedException();
}
queue = this.queueWeakReference.get();
if (queue == null) {
stop = true;
continue;
}
if (queue.isEmpty() || consumeCount > MAX_CONSUME_COUNT ||
interruptedException != null) {
queue.consumeComplete();
stop = true;
if (!queue.isEmpty()) {
queue.reScheduleIfNeeded();
}
continue;
}
try {
while (!queue.isEmpty() && batchJobs.size() < PAGE_SIZE) {
EphemeralJob<?> job = queue.poll();
if (job == null) {
continue;
}
batchJobs.add(job);
}
if (batchJobs.isEmpty()) {
continue;
}
consumeCount += batchJobs.size();
result = this.executeBatchJob(batchJobs, result);
} catch (InterruptedException e) {
interruptedException = e;
} finally {
batchJobs.clear();
}
}
if (interruptedException != null) {
Thread.currentThread().interrupt();
throw interruptedException;
}
return result;
}