public void run()

in computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/QueuedMessageSender.java [127:181]


        public void run() {
            LOG.info("The send-executor is running");
            Thread thread = Thread.currentThread();
            while (!thread.isInterrupted()) {
                try {
                    int emptyQueueCount = 0;
                    int busyClientCount = 0;
                    for (WorkerChannel channel : channels) {
                        QueuedMessage message = channel.queue.peek();
                        if (message == null) {
                            ++emptyQueueCount;
                            continue;
                        }
                        if (channel.doSend(message)) {
                            // Only consume the message after it is sent
                            channel.queue.take();
                        } else {
                            ++busyClientCount;
                        }
                    }
                    int channelCount = channels.length;
                    /*
                     * If all queues are empty, let send thread wait
                     * until any queue is available
                     */
                    if (emptyQueueCount >= channelCount) {
                        LOG.debug("The send executor was blocked " +
                                  "to wait any queue not empty");
                        QueuedMessageSender.this.waitAnyQueueNotEmpty();
                    }
                    /*
                     * If all clients are busy, let send thread wait
                     * until any client is available
                     */
                    if (busyClientCount >= channelCount) {
                        LOG.debug("The send executor was blocked " +
                                  "to wait any client not busy");
                        QueuedMessageSender.this.waitAnyClientNotBusy();
                    }
                } catch (InterruptedException e) {
                    // Reset interrupted flag
                    thread.interrupt();
                    // Any client is active means that sending task in running
                    if (QueuedMessageSender.this.activeClientCount() > 0) {
                        throw new ComputerException(
                                  "Interrupted when waiting for message " +
                                  "queue not empty");
                    }
                } catch (TransportException e) {
                    // TODO: should handle this in main workflow thread
                    throw new ComputerException("Failed to send message", e);
                }
            }
            LOG.info("The send-executor is terminated");
        }