in computer-test/src/main/java/org/apache/hugegraph/computer/suite/integrate/SenderIntegrateTest.java [224:291]
public void testOneWorkerWithBusyClient() {
AtomicReference<MasterService> masterServiceRef = new AtomicReference<>();
AtomicReference<WorkerService> workerServiceRef = new AtomicReference<>();
CompletableFuture<Void> masterFuture = new CompletableFuture<>();
Thread masterThread = new Thread(() -> {
String[] args = OptionsBuilder.newInstance()
.withJobId("local_002")
.withAlgorithm(PageRankParams.class)
.withResultName("rank")
.withResultClass(DoubleValue.class)
.withMessageClass(DoubleValue.class)
.withMaxSuperStep(3)
.withComputationClass(COMPUTATION)
.withWorkerCount(1)
.withWriteBufferHighMark(10)
.withWriteBufferLowMark(5)
.withRpcServerHost("127.0.0.1")
.withRpcServerPort(0)
.build();
try (MasterService service = initMaster(args)) {
masterServiceRef.set(service);
service.execute();
masterFuture.complete(null);
} catch (Throwable e) {
LOG.error("Failed to execute master service", e);
masterFuture.completeExceptionally(e);
}
});
masterThread.setDaemon(true);
CompletableFuture<Void> workerFuture = new CompletableFuture<>();
int transoprtServerPort = 8998;
Thread workerThread = new Thread(() -> {
String[] args = OptionsBuilder.newInstance()
.withJobId("local_002")
.withAlgorithm(PageRankParams.class)
.withResultName("rank")
.withResultClass(DoubleValue.class)
.withMessageClass(DoubleValue.class)
.withMaxSuperStep(3)
.withComputationClass(COMPUTATION)
.withWorkerCount(1)
.withWriteBufferHighMark(20)
.withWriteBufferLowMark(10)
.withTransoprtServerPort(transoprtServerPort)
.build();
try (WorkerService service = initWorker(args)) {
workerServiceRef.set(service);
// Let send rate slowly
this.slowSendFunc(service, transoprtServerPort);
service.execute();
workerFuture.complete(null);
} catch (Throwable e) {
LOG.error("Failed to execute worker service", e);
workerFuture.completeExceptionally(e);
}
});
workerThread.setDaemon(true);
masterThread.start();
workerThread.start();
try {
CompletableFuture.allOf(workerFuture, masterFuture).join();
} finally {
workerServiceRef.get().close();
masterServiceRef.get().close();
}
}