in computer-test/src/main/java/org/apache/hugegraph/computer/suite/integrate/SenderIntegrateTest.java [72:137]
public void testOneWorker() {
AtomicReference<MasterService> masterServiceRef = new AtomicReference<>();
AtomicReference<WorkerService> workerServiceRef = new AtomicReference<>();
CompletableFuture<Void> masterFuture = new CompletableFuture<>();
Thread masterThread = new Thread(() -> {
String[] args = OptionsBuilder.newInstance()
.withJobId("local_002")
.withAlgorithm(PageRankParams.class)
.withResultName("rank")
.withResultClass(DoubleValue.class)
.withMessageClass(DoubleValue.class)
.withMaxSuperStep(3)
.withComputationClass(COMPUTATION)
.withWorkerCount(1)
.withBufferThreshold(50)
.withBufferCapacity(60)
.withRpcServerHost("127.0.0.1")
.withRpcServerPort(8611)
.withRpcServerPort(0)
.build();
try (MasterService service = initMaster(args)) {
masterServiceRef.set(service);
service.execute();
masterFuture.complete(null);
} catch (Exception e) {
LOG.error("Failed to execute master service", e);
masterFuture.completeExceptionally(e);
}
});
masterThread.setDaemon(true);
CompletableFuture<Void> workerFuture = new CompletableFuture<>();
Thread workerThread = new Thread(() -> {
String[] args = OptionsBuilder.newInstance()
.withJobId("local_002")
.withAlgorithm(PageRankParams.class)
.withResultName("rank")
.withResultClass(DoubleValue.class)
.withMessageClass(DoubleValue.class)
.withMaxSuperStep(3)
.withComputationClass(COMPUTATION)
.withWorkerCount(1)
.withBufferThreshold(50)
.withBufferCapacity(60)
.withTransoprtServerPort(0)
.build();
try (WorkerService service = initWorker(args)) {
workerServiceRef.set(service);
service.execute();
workerFuture.complete(null);
} catch (Throwable e) {
LOG.error("Failed to execute worker service", e);
workerFuture.completeExceptionally(e);
}
});
workerThread.setDaemon(true);
masterThread.start();
workerThread.start();
try {
CompletableFuture.allOf(workerFuture, masterFuture).join();
} finally {
workerServiceRef.get().close();
masterServiceRef.get().close();
}
}