public void testOneWorker()

in computer-test/src/main/java/org/apache/hugegraph/computer/suite/integrate/SenderIntegrateTest.java [72:137]


    public void testOneWorker() {
        AtomicReference<MasterService> masterServiceRef = new AtomicReference<>();
        AtomicReference<WorkerService> workerServiceRef = new AtomicReference<>();
        CompletableFuture<Void> masterFuture = new CompletableFuture<>();
        Thread masterThread = new Thread(() -> {
            String[] args = OptionsBuilder.newInstance()
                                          .withJobId("local_002")
                                          .withAlgorithm(PageRankParams.class)
                                          .withResultName("rank")
                                          .withResultClass(DoubleValue.class)
                                          .withMessageClass(DoubleValue.class)
                                          .withMaxSuperStep(3)
                                          .withComputationClass(COMPUTATION)
                                          .withWorkerCount(1)
                                          .withBufferThreshold(50)
                                          .withBufferCapacity(60)
                                          .withRpcServerHost("127.0.0.1")
                                          .withRpcServerPort(8611)
                                          .withRpcServerPort(0)
                                          .build();
            try (MasterService service = initMaster(args)) {
                masterServiceRef.set(service);
                service.execute();
                masterFuture.complete(null);
            } catch (Exception e) {
                LOG.error("Failed to execute master service", e);
                masterFuture.completeExceptionally(e);
            }
        });
        masterThread.setDaemon(true);

        CompletableFuture<Void> workerFuture = new CompletableFuture<>();
        Thread workerThread = new Thread(() -> {
            String[] args = OptionsBuilder.newInstance()
                                          .withJobId("local_002")
                                          .withAlgorithm(PageRankParams.class)
                                          .withResultName("rank")
                                          .withResultClass(DoubleValue.class)
                                          .withMessageClass(DoubleValue.class)
                                          .withMaxSuperStep(3)
                                          .withComputationClass(COMPUTATION)
                                          .withWorkerCount(1)
                                          .withBufferThreshold(50)
                                          .withBufferCapacity(60)
                                          .withTransoprtServerPort(0)
                                          .build();
            try (WorkerService service = initWorker(args)) {
                workerServiceRef.set(service);
                service.execute();
                workerFuture.complete(null);
            } catch (Throwable e) {
                LOG.error("Failed to execute worker service", e);
                workerFuture.completeExceptionally(e);
            }
        });
        workerThread.setDaemon(true);
        masterThread.start();
        workerThread.start();

        try {
            CompletableFuture.allOf(workerFuture, masterFuture).join();
        } finally {
            workerServiceRef.get().close();
            masterServiceRef.get().close();
        }
    }