in computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSendManager.java [259:286]
private void sendControlMessageToWorkers(Set<Integer> workerIds,
MessageType type) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
try {
for (Integer workerId : workerIds) {
futures.add(this.sender.send(workerId, type));
}
} catch (InterruptedException e) {
throw new ComputerException("Interrupted when waiting to " +
"send message async");
}
long timeout = type == MessageType.FINISH ?
this.transportConf.timeoutFinishSession() :
this.transportConf.timeoutSyncRequest();
try {
for (CompletableFuture<Void> future : futures) {
future.get(timeout, TimeUnit.MILLISECONDS);
}
} catch (TimeoutException e) {
throw new ComputerException("Timeout(%sms) to wait for " +
"controlling message(%s) to finished",
e, timeout, type);
} catch (InterruptedException | ExecutionException e) {
throw new ComputerException("Failed to wait for controlling " +
"message(%s) to finished", e, type);
}
}