public void testServiceWith1Worker()

in computer-test/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerServiceTest.java [43:126]


    public void testServiceWith1Worker() throws InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(2);
        CountDownLatch countDownLatch = new CountDownLatch(2);
        Throwable[] exceptions = new Throwable[2];

        pool.submit(() -> {
            Config config = UnitTestBase.updateWithRequiredOptions(
                ComputerOptions.JOB_ID, "local_002",
                ComputerOptions.JOB_WORKERS_COUNT, "1",
                ComputerOptions.TRANSPORT_SERVER_PORT, "8086",
                ComputerOptions.BSP_REGISTER_TIMEOUT, "100000",
                ComputerOptions.BSP_LOG_INTERVAL, "30000",
                ComputerOptions.BSP_MAX_SUPER_STEP, "2",
                ComputerOptions.WORKER_COMPUTATION_CLASS,
                MockComputation.class.getName(),
                ComputerOptions.ALGORITHM_RESULT_CLASS,
                DoubleValue.class.getName(),
                ComputerOptions.ALGORITHM_MESSAGE_CLASS,
                DoubleValue.class.getName(),
                ComputerOptions.OUTPUT_CLASS,
                LimitedLogOutput.class.getName()
            );
            WorkerService workerService = new MockWorkerService();
            try {
                workerService.init(config);
                workerService.execute();
            } catch (Throwable e) {
                LOG.error("Failed to start worker", e);
                exceptions[0] = e;
            } finally {
                workerService.close();
                try {
                    workerService.close();
                } catch (Throwable e) {
                    Assert.fail(e.getMessage());
                }
                countDownLatch.countDown();
            }
        });

        pool.submit(() -> {
            Config config = UnitTestBase.updateWithRequiredOptions(
                RpcOptions.RPC_SERVER_HOST, "localhost",
                ComputerOptions.JOB_ID, "local_002",
                ComputerOptions.JOB_WORKERS_COUNT, "1",
                ComputerOptions.BSP_REGISTER_TIMEOUT, "100000",
                ComputerOptions.BSP_LOG_INTERVAL, "30000",
                ComputerOptions.BSP_MAX_SUPER_STEP, "2",
                ComputerOptions.MASTER_COMPUTATION_CLASS,
                MockMasterComputation.class.getName(),
                ComputerOptions.ALGORITHM_RESULT_CLASS,
                DoubleValue.class.getName(),
                ComputerOptions.ALGORITHM_MESSAGE_CLASS,
                DoubleValue.class.getName()
            );
            MasterService masterService = new MasterService();
            try {
                masterService.init(config);
                masterService.execute();
            } catch (Throwable e) {
                LOG.error("Failed to start master", e);
                exceptions[1] = e;
            } finally {
                /*
                 * It must close the service first. The pool will be shutdown
                 * if count down is executed first, and the server thread in
                 * master service will not be closed.
                 */
                masterService.close();
                try {
                    masterService.close();
                } catch (Throwable e) {
                    Assert.fail(e.getMessage());
                }
                countDownLatch.countDown();
            }
        });

        countDownLatch.await();
        pool.shutdownNow();

        Assert.assertFalse(Arrays.asList(exceptions).toString(),
                           existError(exceptions));
    }