public void testServiceWith2Workers()

in computer-test/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerServiceTest.java [129:228]


    public void testServiceWith2Workers() throws InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(3);
        CountDownLatch countDownLatch = new CountDownLatch(3);
        Throwable[] exceptions = new Throwable[3];

        pool.submit(() -> {
            Config config = UnitTestBase.updateWithRequiredOptions(
                ComputerOptions.JOB_ID, "local_003",
                ComputerOptions.JOB_WORKERS_COUNT, "2",
                ComputerOptions.JOB_PARTITIONS_COUNT, "2",
                ComputerOptions.TRANSPORT_SERVER_PORT, "8086",
                ComputerOptions.WORKER_DATA_DIRS, "[job_8086]",
                ComputerOptions.BSP_REGISTER_TIMEOUT, "30000",
                ComputerOptions.BSP_LOG_INTERVAL, "10000",
                ComputerOptions.BSP_MAX_SUPER_STEP, "2",
                ComputerOptions.WORKER_COMPUTATION_CLASS,
                MockComputation2.class.getName(),
                ComputerOptions.ALGORITHM_RESULT_CLASS,
                DoubleValue.class.getName(),
                ComputerOptions.ALGORITHM_MESSAGE_CLASS,
                DoubleValue.class.getName()
            );
            WorkerService workerService = new MockWorkerService();
            try {
                workerService.init(config);
                workerService.execute();
            } catch (Throwable e) {
                LOG.error("Failed to start worker", e);
                exceptions[0] = e;
            } finally {
                workerService.close();
                countDownLatch.countDown();
            }
        });

        pool.submit(() -> {
            Config config = UnitTestBase.updateWithRequiredOptions(
                ComputerOptions.JOB_ID, "local_003",
                ComputerOptions.JOB_WORKERS_COUNT, "2",
                ComputerOptions.JOB_PARTITIONS_COUNT, "2",
                ComputerOptions.TRANSPORT_SERVER_PORT, "8087",
                ComputerOptions.WORKER_DATA_DIRS, "[job_8087]",
                ComputerOptions.BSP_REGISTER_TIMEOUT, "30000",
                ComputerOptions.BSP_LOG_INTERVAL, "10000",
                ComputerOptions.BSP_MAX_SUPER_STEP, "2",
                ComputerOptions.WORKER_COMPUTATION_CLASS,
                MockComputation2.class.getName(),
                ComputerOptions.ALGORITHM_RESULT_CLASS,
                DoubleValue.class.getName(),
                ComputerOptions.ALGORITHM_MESSAGE_CLASS,
                DoubleValue.class.getName()
            );
            WorkerService workerService = new MockWorkerService();
            try {
                workerService.init(config);
                workerService.execute();
            } catch (Throwable e) {
                LOG.error("Failed to start worker", e);
                exceptions[1] = e;
            } finally {
                workerService.close();
                countDownLatch.countDown();
            }
        });

        pool.submit(() -> {
            Config config = UnitTestBase.updateWithRequiredOptions(
                RpcOptions.RPC_SERVER_HOST, "localhost",
                ComputerOptions.JOB_ID, "local_003",
                ComputerOptions.JOB_WORKERS_COUNT, "2",
                ComputerOptions.JOB_PARTITIONS_COUNT, "2",
                ComputerOptions.BSP_REGISTER_TIMEOUT, "30000",
                ComputerOptions.BSP_LOG_INTERVAL, "10000",
                ComputerOptions.BSP_MAX_SUPER_STEP, "2",
                ComputerOptions.MASTER_COMPUTATION_CLASS,
                MockMasterComputation2.class.getName(),
                ComputerOptions.ALGORITHM_RESULT_CLASS,
                DoubleValue.class.getName(),
                ComputerOptions.ALGORITHM_MESSAGE_CLASS,
                DoubleValue.class.getName()
            );
            MasterService masterService = new MasterService();
            try {
                masterService.init(config);
                masterService.execute();
            } catch (Throwable e) {
                LOG.error("Failed to start master", e);
                exceptions[2] = e;
            } finally {
                masterService.close();
                countDownLatch.countDown();
            }
        });

        countDownLatch.await();
        pool.shutdownNow();

        Assert.assertFalse(Arrays.asList(exceptions).toString(),
                           existError(exceptions));
    }