public void testServiceWith2Workers()

in computer/computer-test/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerServiceTest.java [114:207]


    public void testServiceWith2Workers() throws InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(3);
        CountDownLatch countDownLatch = new CountDownLatch(3);
        Throwable[] exceptions = new Throwable[3];

        pool.submit(() -> {
            Config config = UnitTestBase.updateWithRequiredOptions(
                    ComputerOptions.JOB_ID, "local_003",
                    ComputerOptions.JOB_WORKERS_COUNT, "2",
                    ComputerOptions.JOB_PARTITIONS_COUNT, "2",
                    ComputerOptions.TRANSPORT_SERVER_PORT, "8086",
                    ComputerOptions.WORKER_DATA_DIRS, "[job_8086]",
                    ComputerOptions.BSP_REGISTER_TIMEOUT, "30000",
                    ComputerOptions.BSP_LOG_INTERVAL, "10000",
                    ComputerOptions.BSP_MAX_SUPER_STEP, "2",
                    ComputerOptions.WORKER_COMPUTATION_CLASS,
                    MockComputation2.class.getName(),
                    ComputerOptions.ALGORITHM_RESULT_CLASS,
                    DoubleValue.class.getName(),
                    ComputerOptions.ALGORITHM_MESSAGE_CLASS,
                    DoubleValue.class.getName()
            );

            try (WorkerService workerService = new MockWorkerService()) {
                workerService.init(config);
                workerService.execute();
            } catch (Throwable e) {
                LOG.error("Failed to start worker", e);
                exceptions[0] = e;
            } finally {
                countDownLatch.countDown();
            }
        });

        pool.submit(() -> {
            Config config = UnitTestBase.updateWithRequiredOptions(
                    ComputerOptions.JOB_ID, "local_003",
                    ComputerOptions.JOB_WORKERS_COUNT, "2",
                    ComputerOptions.JOB_PARTITIONS_COUNT, "2",
                    ComputerOptions.TRANSPORT_SERVER_PORT, "8087",
                    ComputerOptions.WORKER_DATA_DIRS, "[job_8087]",
                    ComputerOptions.BSP_REGISTER_TIMEOUT, "30000",
                    ComputerOptions.BSP_LOG_INTERVAL, "10000",
                    ComputerOptions.BSP_MAX_SUPER_STEP, "2",
                    ComputerOptions.WORKER_COMPUTATION_CLASS,
                    MockComputation2.class.getName(),
                    ComputerOptions.ALGORITHM_RESULT_CLASS,
                    DoubleValue.class.getName(),
                    ComputerOptions.ALGORITHM_MESSAGE_CLASS,
                    DoubleValue.class.getName()
            );
            try (WorkerService workerService = new MockWorkerService()) {
                workerService.init(config);
                workerService.execute();
            } catch (Throwable e) {
                LOG.error("Failed to start worker", e);
                exceptions[1] = e;
            } finally {
                countDownLatch.countDown();
            }
        });

        pool.submit(() -> {
            Config config = UnitTestBase.updateWithRequiredOptions(
                    RpcOptions.RPC_SERVER_HOST, "localhost",
                    ComputerOptions.JOB_ID, "local_003",
                    ComputerOptions.JOB_WORKERS_COUNT, "2",
                    ComputerOptions.JOB_PARTITIONS_COUNT, "2",
                    ComputerOptions.BSP_REGISTER_TIMEOUT, "30000",
                    ComputerOptions.BSP_LOG_INTERVAL, "10000",
                    ComputerOptions.BSP_MAX_SUPER_STEP, "2",
                    ComputerOptions.MASTER_COMPUTATION_CLASS,
                    MockMasterComputation2.class.getName(),
                    ComputerOptions.ALGORITHM_RESULT_CLASS,
                    DoubleValue.class.getName(),
                    ComputerOptions.ALGORITHM_MESSAGE_CLASS,
                    DoubleValue.class.getName()
            );
            try (MasterService masterService = new MasterService()) {
                masterService.init(config);
                masterService.execute();
            } catch (Throwable e) {
                LOG.error("Failed to start master", e);
                exceptions[2] = e;
            } finally {
                countDownLatch.countDown();
            }
        });

        countDownLatch.await();
        pool.shutdownNow();

        Assert.assertFalse(Arrays.asList(exceptions).toString(), existError(exceptions));
    }