in computer-test/src/main/java/org/apache/hugegraph/computer/algorithm/AlgorithmTestBase.java [47:149]
public static void runAlgorithm(String algorithmParams, String... options)
throws InterruptedException {
final Logger log = Log.logger(AlgorithmTestBase.class);
ExecutorService pool = Executors.newFixedThreadPool(2);
CountDownLatch countDownLatch = new CountDownLatch(2);
Throwable[] exceptions = new Throwable[2];
AtomicReference<MasterService> masterServiceRef = new AtomicReference<>();
Set<WorkerService> workerServices = Sets.newConcurrentHashSet();
pool.submit(() -> {
WorkerService workerService = null;
try {
Map<String, String> params = new HashMap<>();
params.put(ComputerOptions.JOB_ID.name(),
"algo_test_job1");
params.put(ComputerOptions.JOB_WORKERS_COUNT.name(),
"1");
params.put(ComputerOptions.TRANSPORT_SERVER_PORT.name(),
"8086");
params.put(ComputerOptions.BSP_REGISTER_TIMEOUT.name(),
"100000");
params.put(ComputerOptions.BSP_LOG_INTERVAL.name(),
"30000");
params.put(ComputerOptions.BSP_MAX_SUPER_STEP.name(),
"10");
params.put(ComputerOptions.ALGORITHM_PARAMS_CLASS.name(),
algorithmParams);
if (options != null) {
for (int i = 0; i < options.length; i += 2) {
params.put(options[i], options[i + 1]);
}
}
Config config = ComputerContextUtil.initContext(params);
workerService = new MockWorkerService();
workerServices.add(workerService);
workerService.init(config);
workerService.execute();
} catch (Throwable e) {
LOG.error("Failed to start worker", e);
exceptions[0] = e;
// If worker failed, the master also should quit
while (countDownLatch.getCount() > 0) {
countDownLatch.countDown();
}
} finally {
countDownLatch.countDown();
}
});
pool.submit(() -> {
MasterService masterService = null;
try {
Map<String, String> params = new HashMap<>();
params.put(RpcOptions.RPC_SERVER_HOST.name(),
"localhost");
params.put(ComputerOptions.JOB_ID.name(),
"algo_test_job1");
params.put(ComputerOptions.JOB_WORKERS_COUNT.name(),
"1");
params.put(ComputerOptions.BSP_REGISTER_TIMEOUT.name(),
"100000");
params.put(ComputerOptions.BSP_LOG_INTERVAL.name(),
"30000");
params.put(ComputerOptions.BSP_MAX_SUPER_STEP.name(),
"10");
params.put(ComputerOptions.ALGORITHM_PARAMS_CLASS.name(),
algorithmParams);
if (options != null) {
for (int i = 0; i < options.length; i += 2) {
params.put(options[i], options[i + 1]);
}
}
Config config = ComputerContextUtil.initContext(params);
masterService = new MasterService();
masterServiceRef.set(masterService);
masterService.init(config);
masterService.execute();
} catch (Throwable e) {
LOG.error("Failed to start master", e);
exceptions[1] = e;
// If master failed, the worker also should quit
while (countDownLatch.getCount() > 0) {
countDownLatch.countDown();
}
} finally {
countDownLatch.countDown();
}
});
try {
countDownLatch.await();
} finally {
for (WorkerService workerService : workerServices) {
workerService.close();
}
masterServiceRef.get().close();
}
pool.shutdownNow();
Assert.assertFalse(Arrays.asList(exceptions).toString(),
existError(exceptions));
}