in computer-test/src/main/java/org/apache/hugegraph/computer/suite/integrate/SenderIntegrateTest.java [140:221]
public void testMultiWorkers() throws IOException {
int workerCount = 3;
int partitionCount = 3;
AtomicReference<MasterService> masterServiceRef = new AtomicReference<>();
Set<WorkerService> workerServices = Sets.newConcurrentHashSet();
CompletableFuture<Void> masterFuture = new CompletableFuture<>();
Thread masterThread = new Thread(() -> {
String[] args = OptionsBuilder.newInstance()
.withJobId("local_003")
.withAlgorithm(PageRankParams.class)
.withResultName("rank")
.withResultClass(DoubleValue.class)
.withMessageClass(DoubleValue.class)
.withMaxSuperStep(3)
.withComputationClass(COMPUTATION)
.withWorkerCount(workerCount)
.withPartitionCount(partitionCount)
.withRpcServerHost("127.0.0.1")
.withRpcServerPort(0)
.build();
try {
MasterService service = initMaster(args);
masterServiceRef.set(service);
service.execute();
masterFuture.complete(null);
} catch (Throwable e) {
LOG.error("Failed to execute master service", e);
masterFuture.completeExceptionally(e);
}
});
masterThread.setDaemon(true);
Map<Thread, CompletableFuture<Void>> workers = new HashMap<>(workerCount);
for (int i = 1; i <= workerCount; i++) {
String dir = "[jobs-" + i + "]";
CompletableFuture<Void> workerFuture = new CompletableFuture<>();
Thread thread = new Thread(() -> {
String[] args;
args = OptionsBuilder.newInstance()
.withJobId("local_003")
.withAlgorithm(PageRankParams.class)
.withResultName("rank")
.withResultClass(DoubleValue.class)
.withMessageClass(DoubleValue.class)
.withMaxSuperStep(3)
.withComputationClass(COMPUTATION)
.withWorkerCount(workerCount)
.withPartitionCount(partitionCount)
.withTransoprtServerPort(0)
.withDataDirs(dir)
.build();
try {
WorkerService service = initWorker(args);
workerServices.add(service);
service.execute();
workerFuture.complete(null);
} catch (Throwable e) {
LOG.error("Failed to execute worker service", e);
workerFuture.completeExceptionally(e);
}
});
thread.setDaemon(true);
workers.put(thread, workerFuture);
}
masterThread.start();
for (Thread worker : workers.keySet()) {
worker.start();
}
List<CompletableFuture<Void>> futures = new ArrayList<>(workers.values());
futures.add(masterFuture);
try {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
} finally {
for (WorkerService workerService : workerServices) {
workerService.close();
}
masterServiceRef.get().close();
}
}