in computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/QueuedMessageSender.java [127:181]
public void run() {
LOG.info("The send-executor is running");
Thread thread = Thread.currentThread();
while (!thread.isInterrupted()) {
try {
int emptyQueueCount = 0;
int busyClientCount = 0;
for (WorkerChannel channel : channels) {
QueuedMessage message = channel.queue.peek();
if (message == null) {
++emptyQueueCount;
continue;
}
if (channel.doSend(message)) {
// Only consume the message after it is sent
channel.queue.take();
} else {
++busyClientCount;
}
}
int channelCount = channels.length;
/*
* If all queues are empty, let send thread wait
* until any queue is available
*/
if (emptyQueueCount >= channelCount) {
LOG.debug("The send executor was blocked " +
"to wait any queue not empty");
QueuedMessageSender.this.waitAnyQueueNotEmpty();
}
/*
* If all clients are busy, let send thread wait
* until any client is available
*/
if (busyClientCount >= channelCount) {
LOG.debug("The send executor was blocked " +
"to wait any client not busy");
QueuedMessageSender.this.waitAnyClientNotBusy();
}
} catch (InterruptedException e) {
// Reset interrupted flag
thread.interrupt();
// Any client is active means that sending task in running
if (QueuedMessageSender.this.activeClientCount() > 0) {
throw new ComputerException(
"Interrupted when waiting for message " +
"queue not empty");
}
} catch (TransportException e) {
// TODO: should handle this in main workflow thread
throw new ComputerException("Failed to send message", e);
}
}
LOG.info("The send-executor is terminated");
}