public void testMultiWorkers()

in computer-test/src/main/java/org/apache/hugegraph/computer/suite/integrate/SenderIntegrateTest.java [140:221]


    public void testMultiWorkers() throws IOException {
        int workerCount = 3;
        int partitionCount = 3;
        AtomicReference<MasterService> masterServiceRef = new AtomicReference<>();
        Set<WorkerService> workerServices = Sets.newConcurrentHashSet();
        CompletableFuture<Void> masterFuture = new CompletableFuture<>();
        Thread masterThread = new Thread(() -> {
            String[] args = OptionsBuilder.newInstance()
                                          .withJobId("local_003")
                                          .withAlgorithm(PageRankParams.class)
                                          .withResultName("rank")
                                          .withResultClass(DoubleValue.class)
                                          .withMessageClass(DoubleValue.class)
                                          .withMaxSuperStep(3)
                                          .withComputationClass(COMPUTATION)
                                          .withWorkerCount(workerCount)
                                          .withPartitionCount(partitionCount)
                                          .withRpcServerHost("127.0.0.1")
                                          .withRpcServerPort(0)
                                          .build();
            try {
                MasterService service = initMaster(args);
                masterServiceRef.set(service);
                service.execute();
                masterFuture.complete(null);
            } catch (Throwable e) {
                LOG.error("Failed to execute master service", e);
                masterFuture.completeExceptionally(e);
            }
        });
        masterThread.setDaemon(true);

        Map<Thread, CompletableFuture<Void>> workers = new HashMap<>(workerCount);
        for (int i = 1; i <= workerCount; i++) {
            String dir = "[jobs-" + i + "]";

            CompletableFuture<Void> workerFuture = new CompletableFuture<>();
            Thread thread = new Thread(() -> {
                String[] args;
                args = OptionsBuilder.newInstance()
                        .withJobId("local_003")
                        .withAlgorithm(PageRankParams.class)
                        .withResultName("rank")
                        .withResultClass(DoubleValue.class)
                        .withMessageClass(DoubleValue.class)
                        .withMaxSuperStep(3)
                        .withComputationClass(COMPUTATION)
                        .withWorkerCount(workerCount)
                        .withPartitionCount(partitionCount)
                        .withTransoprtServerPort(0)
                        .withDataDirs(dir)
                        .build();
                try {
                    WorkerService service = initWorker(args);
                    workerServices.add(service);
                    service.execute();
                    workerFuture.complete(null);
                } catch (Throwable e) {
                    LOG.error("Failed to execute worker service", e);
                    workerFuture.completeExceptionally(e);
                }
            });
            thread.setDaemon(true);
            workers.put(thread, workerFuture);
        }

        masterThread.start();
        for (Thread worker : workers.keySet()) {
            worker.start();
        }

        List<CompletableFuture<Void>> futures = new ArrayList<>(workers.values());
        futures.add(masterFuture);
        try {
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
        } finally {
            for (WorkerService workerService : workerServices) {
                workerService.close();
            }
            masterServiceRef.get().close();
        }
    }