in computer-test/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerServiceTest.java [43:126]
public void testServiceWith1Worker() throws InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(2);
CountDownLatch countDownLatch = new CountDownLatch(2);
Throwable[] exceptions = new Throwable[2];
pool.submit(() -> {
Config config = UnitTestBase.updateWithRequiredOptions(
ComputerOptions.JOB_ID, "local_002",
ComputerOptions.JOB_WORKERS_COUNT, "1",
ComputerOptions.TRANSPORT_SERVER_PORT, "8086",
ComputerOptions.BSP_REGISTER_TIMEOUT, "100000",
ComputerOptions.BSP_LOG_INTERVAL, "30000",
ComputerOptions.BSP_MAX_SUPER_STEP, "2",
ComputerOptions.WORKER_COMPUTATION_CLASS,
MockComputation.class.getName(),
ComputerOptions.ALGORITHM_RESULT_CLASS,
DoubleValue.class.getName(),
ComputerOptions.ALGORITHM_MESSAGE_CLASS,
DoubleValue.class.getName(),
ComputerOptions.OUTPUT_CLASS,
LimitedLogOutput.class.getName()
);
WorkerService workerService = new MockWorkerService();
try {
workerService.init(config);
workerService.execute();
} catch (Throwable e) {
LOG.error("Failed to start worker", e);
exceptions[0] = e;
} finally {
workerService.close();
try {
workerService.close();
} catch (Throwable e) {
Assert.fail(e.getMessage());
}
countDownLatch.countDown();
}
});
pool.submit(() -> {
Config config = UnitTestBase.updateWithRequiredOptions(
RpcOptions.RPC_SERVER_HOST, "localhost",
ComputerOptions.JOB_ID, "local_002",
ComputerOptions.JOB_WORKERS_COUNT, "1",
ComputerOptions.BSP_REGISTER_TIMEOUT, "100000",
ComputerOptions.BSP_LOG_INTERVAL, "30000",
ComputerOptions.BSP_MAX_SUPER_STEP, "2",
ComputerOptions.MASTER_COMPUTATION_CLASS,
MockMasterComputation.class.getName(),
ComputerOptions.ALGORITHM_RESULT_CLASS,
DoubleValue.class.getName(),
ComputerOptions.ALGORITHM_MESSAGE_CLASS,
DoubleValue.class.getName()
);
MasterService masterService = new MasterService();
try {
masterService.init(config);
masterService.execute();
} catch (Throwable e) {
LOG.error("Failed to start master", e);
exceptions[1] = e;
} finally {
/*
* It must close the service first. The pool will be shutdown
* if count down is executed first, and the server thread in
* master service will not be closed.
*/
masterService.close();
try {
masterService.close();
} catch (Throwable e) {
Assert.fail(e.getMessage());
}
countDownLatch.countDown();
}
});
countDownLatch.await();
pool.shutdownNow();
Assert.assertFalse(Arrays.asList(exceptions).toString(),
existError(exceptions));
}