hugegraph-core/src/main/java/org/apache/hugegraph/job/algorithm/Consumers.java [41:90]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private static final Logger LOG = Log.logger(Consumers.class);

    private final ExecutorService executor;
    private final Consumer<V> consumer;
    private final Runnable done;

    private final int workers;
    private final int queueSize;
    private final CountDownLatch latch;
    private final BlockingQueue<V> queue;

    private volatile boolean ending = false;
    private volatile Throwable exception = null;

    public Consumers(ExecutorService executor, Consumer<V> consumer) {
        this(executor, consumer, null);
    }

    public Consumers(ExecutorService executor,
                     Consumer<V> consumer, Runnable done) {
        this.executor = executor;
        this.consumer = consumer;
        this.done = done;

        int workers = THREADS;
        if (this.executor instanceof ThreadPoolExecutor) {
            workers = ((ThreadPoolExecutor) this.executor).getCorePoolSize();
        }
        this.workers = workers;
        this.queueSize = QUEUE_WORKER_SIZE * workers;
        this.latch = new CountDownLatch(workers);
        this.queue = new ArrayBlockingQueue<>(this.queueSize);
    }

    public void start(String name) {
        this.ending = false;
        this.exception = null;
        if (this.executor == null) {
            return;
        }
        LOG.info("Starting {} workers[{}] with queue size {}...",
                 this.workers, name, this.queueSize);
        for (int i = 0; i < this.workers; i++) {
            this.executor.submit(new ContextCallable<>(this::runAndDone));
        }
    }

    private Void runAndDone() {
        try {
            this.run();
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java [47:96]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private static final Logger LOG = Log.logger(Consumers.class);

    private final ExecutorService executor;
    private final Consumer<V> consumer;
    private final Runnable done;

    private final int workers;
    private final int queueSize;
    private final CountDownLatch latch;
    private final BlockingQueue<V> queue;

    private volatile boolean ending = false;
    private volatile Throwable exception = null;

    public Consumers(ExecutorService executor, Consumer<V> consumer) {
        this(executor, consumer, null);
    }

    public Consumers(ExecutorService executor,
                     Consumer<V> consumer, Runnable done) {
        this.executor = executor;
        this.consumer = consumer;
        this.done = done;

        int workers = THREADS;
        if (this.executor instanceof ThreadPoolExecutor) {
            workers = ((ThreadPoolExecutor) this.executor).getCorePoolSize();
        }
        this.workers = workers;
        this.queueSize = QUEUE_WORKER_SIZE * workers;
        this.latch = new CountDownLatch(workers);
        this.queue = new ArrayBlockingQueue<>(this.queueSize);
    }

    public void start(String name) {
        this.ending = false;
        this.exception = null;
        if (this.executor == null) {
            return;
        }
        LOG.info("Starting {} workers[{}] with queue size {}...",
                 this.workers, name, this.queueSize);
        for (int i = 0; i < this.workers; i++) {
            this.executor.submit(new ContextCallable<>(this::runAndDone));
        }
    }

    private Void runAndDone() {
        try {
            this.run();
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



