in computer-test/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerServiceTest.java [129:228]
public void testServiceWith2Workers() throws InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(3);
CountDownLatch countDownLatch = new CountDownLatch(3);
Throwable[] exceptions = new Throwable[3];
pool.submit(() -> {
Config config = UnitTestBase.updateWithRequiredOptions(
ComputerOptions.JOB_ID, "local_003",
ComputerOptions.JOB_WORKERS_COUNT, "2",
ComputerOptions.JOB_PARTITIONS_COUNT, "2",
ComputerOptions.TRANSPORT_SERVER_PORT, "8086",
ComputerOptions.WORKER_DATA_DIRS, "[job_8086]",
ComputerOptions.BSP_REGISTER_TIMEOUT, "30000",
ComputerOptions.BSP_LOG_INTERVAL, "10000",
ComputerOptions.BSP_MAX_SUPER_STEP, "2",
ComputerOptions.WORKER_COMPUTATION_CLASS,
MockComputation2.class.getName(),
ComputerOptions.ALGORITHM_RESULT_CLASS,
DoubleValue.class.getName(),
ComputerOptions.ALGORITHM_MESSAGE_CLASS,
DoubleValue.class.getName()
);
WorkerService workerService = new MockWorkerService();
try {
workerService.init(config);
workerService.execute();
} catch (Throwable e) {
LOG.error("Failed to start worker", e);
exceptions[0] = e;
} finally {
workerService.close();
countDownLatch.countDown();
}
});
pool.submit(() -> {
Config config = UnitTestBase.updateWithRequiredOptions(
ComputerOptions.JOB_ID, "local_003",
ComputerOptions.JOB_WORKERS_COUNT, "2",
ComputerOptions.JOB_PARTITIONS_COUNT, "2",
ComputerOptions.TRANSPORT_SERVER_PORT, "8087",
ComputerOptions.WORKER_DATA_DIRS, "[job_8087]",
ComputerOptions.BSP_REGISTER_TIMEOUT, "30000",
ComputerOptions.BSP_LOG_INTERVAL, "10000",
ComputerOptions.BSP_MAX_SUPER_STEP, "2",
ComputerOptions.WORKER_COMPUTATION_CLASS,
MockComputation2.class.getName(),
ComputerOptions.ALGORITHM_RESULT_CLASS,
DoubleValue.class.getName(),
ComputerOptions.ALGORITHM_MESSAGE_CLASS,
DoubleValue.class.getName()
);
WorkerService workerService = new MockWorkerService();
try {
workerService.init(config);
workerService.execute();
} catch (Throwable e) {
LOG.error("Failed to start worker", e);
exceptions[1] = e;
} finally {
workerService.close();
countDownLatch.countDown();
}
});
pool.submit(() -> {
Config config = UnitTestBase.updateWithRequiredOptions(
RpcOptions.RPC_SERVER_HOST, "localhost",
ComputerOptions.JOB_ID, "local_003",
ComputerOptions.JOB_WORKERS_COUNT, "2",
ComputerOptions.JOB_PARTITIONS_COUNT, "2",
ComputerOptions.BSP_REGISTER_TIMEOUT, "30000",
ComputerOptions.BSP_LOG_INTERVAL, "10000",
ComputerOptions.BSP_MAX_SUPER_STEP, "2",
ComputerOptions.MASTER_COMPUTATION_CLASS,
MockMasterComputation2.class.getName(),
ComputerOptions.ALGORITHM_RESULT_CLASS,
DoubleValue.class.getName(),
ComputerOptions.ALGORITHM_MESSAGE_CLASS,
DoubleValue.class.getName()
);
MasterService masterService = new MasterService();
try {
masterService.init(config);
masterService.execute();
} catch (Throwable e) {
LOG.error("Failed to start master", e);
exceptions[2] = e;
} finally {
masterService.close();
countDownLatch.countDown();
}
});
countDownLatch.await();
pool.shutdownNow();
Assert.assertFalse(Arrays.asList(exceptions).toString(),
existError(exceptions));
}