public static void runAlgorithm()

in computer/computer-test/src/main/java/org/apache/hugegraph/computer/algorithm/AlgorithmTestBase.java [47:149]


    public static void runAlgorithm(String algorithmParams, String... options)
            throws InterruptedException {
        final Logger log = Log.logger(AlgorithmTestBase.class);
        ExecutorService pool = Executors.newFixedThreadPool(2);
        CountDownLatch countDownLatch = new CountDownLatch(2);
        Throwable[] exceptions = new Throwable[2];
        AtomicReference<MasterService> masterServiceRef = new AtomicReference<>();
        Set<WorkerService> workerServices = Sets.newConcurrentHashSet();
        pool.submit(() -> {
            WorkerService workerService = null;
            try {
                Map<String, String> params = new HashMap<>();
                params.put(ComputerOptions.JOB_ID.name(),
                           "algo_test_job1");
                params.put(ComputerOptions.JOB_WORKERS_COUNT.name(),
                           "1");
                params.put(ComputerOptions.TRANSPORT_SERVER_PORT.name(),
                           "8086");
                params.put(ComputerOptions.BSP_REGISTER_TIMEOUT.name(),
                           "100000");
                params.put(ComputerOptions.BSP_LOG_INTERVAL.name(),
                           "30000");
                params.put(ComputerOptions.BSP_MAX_SUPER_STEP.name(),
                           "10");
                params.put(ComputerOptions.ALGORITHM_PARAMS_CLASS.name(),
                           algorithmParams);
                if (options != null) {
                    for (int i = 0; i < options.length; i += 2) {
                        params.put(options[i], options[i + 1]);
                    }
                }
                Config config = ComputerContextUtil.initContext(params);
                workerService = new MockWorkerService();
                workerServices.add(workerService);
                workerService.init(config);
                workerService.execute();
            } catch (Throwable e) {
                LOG.error("Failed to start worker", e);
                exceptions[0] = e;
                // If worker failed, the master also should quit
                while (countDownLatch.getCount() > 0) {
                    countDownLatch.countDown();
                }
            } finally {
                countDownLatch.countDown();
            }
        });

        pool.submit(() -> {
            MasterService masterService = null;
            try {
                Map<String, String> params = new HashMap<>();
                params.put(RpcOptions.RPC_SERVER_HOST.name(),
                           "localhost");
                params.put(ComputerOptions.JOB_ID.name(),
                           "algo_test_job1");
                params.put(ComputerOptions.JOB_WORKERS_COUNT.name(),
                           "1");
                params.put(ComputerOptions.BSP_REGISTER_TIMEOUT.name(),
                           "100000");
                params.put(ComputerOptions.BSP_LOG_INTERVAL.name(),
                           "30000");
                params.put(ComputerOptions.BSP_MAX_SUPER_STEP.name(),
                           "10");
                params.put(ComputerOptions.ALGORITHM_PARAMS_CLASS.name(),
                           algorithmParams);
                if (options != null) {
                    for (int i = 0; i < options.length; i += 2) {
                        params.put(options[i], options[i + 1]);
                    }
                }

                Config config = ComputerContextUtil.initContext(params);

                masterService = new MasterService();
                masterServiceRef.set(masterService);
                masterService.init(config);
                masterService.execute();
            } catch (Throwable e) {
                LOG.error("Failed to start master", e);
                exceptions[1] = e;
                // If master failed, the worker also should quit
                while (countDownLatch.getCount() > 0) {
                    countDownLatch.countDown();
                }
            } finally {
                countDownLatch.countDown();
            }
        });

        try {
            countDownLatch.await();
        } finally {
            for (WorkerService workerService : workerServices) {
                workerService.close();
            }
            masterServiceRef.get().close();
        }
        pool.shutdownNow();

        Assert.assertFalse(Arrays.asList(exceptions).toString(),
                           existError(exceptions));
    }