in computer-test/src/main/java/org/apache/hugegraph/computer/core/bsp/EtcdBspTest.java [123:170]
public void testIterate() throws InterruptedException {
// If both two threads reach countDown, it means no exception is thrown.
WorkerStat workerStat = new WorkerStat();
workerStat.add(new PartitionStat(0, 100L, 200L, 0L));
workerStat.add(new PartitionStat(1, 200L, 300L, 0L));
CountDownLatch countDownLatch = new CountDownLatch(2);
this.executorService.submit(() -> {
for (int i = 0; i < this.maxSuperStep; i++) {
this.bsp4Master.waitWorkersStepPrepareDone(i);
this.bsp4Master.masterStepPrepareDone(i);
this.bsp4Master.waitWorkersStepComputeDone(i);
this.bsp4Master.masterStepComputeDone(i);
List<WorkerStat> list = this.bsp4Master.waitWorkersStepDone(i);
SuperstepStat superstepStat = new SuperstepStat();
for (WorkerStat workerStat1 : list) {
superstepStat.increase(workerStat1);
}
if (i == this.maxSuperStep - 1) {
superstepStat.inactivate();
}
this.bsp4Master.masterStepDone(i, superstepStat);
}
countDownLatch.countDown();
});
this.executorService.submit(() -> {
int superstep = -1;
SuperstepStat superstepStat = null;
while (superstepStat == null || superstepStat.active()) {
superstep++;
this.bsp4Worker.workerStepPrepareDone(superstep);
this.bsp4Worker.waitMasterStepPrepareDone(superstep);
this.bsp4Worker.workerStepComputeDone(superstep);
this.bsp4Worker.waitMasterStepComputeDone(superstep);
PartitionStat stat1 = new PartitionStat(0, 100L, 200L, 50L);
PartitionStat stat2 = new PartitionStat(1, 200L, 300L, 80L);
WorkerStat workerStatInSuperstep = new WorkerStat();
workerStatInSuperstep.add(stat1);
workerStatInSuperstep.add(stat2);
// Sleep some time to simulate the worker do computation.
UnitTestBase.sleep(100L);
this.bsp4Worker.workerStepDone(superstep,
workerStatInSuperstep);
superstepStat = this.bsp4Worker.waitMasterStepDone(superstep);
}
countDownLatch.countDown();
});
countDownLatch.await();
}