public void testStateMachine()

in hugegraph-test/src/main/java/org/apache/hugegraph/core/RoleElectionStateMachineTest.java [141:327]


    public void testStateMachine() throws InterruptedException {
        final CountDownLatch stop = new CountDownLatch(4);
        final int MAX_COUNT = 200;
        final List<LogEntry> logRecords = Collections.synchronizedList(new ArrayList<>(MAX_COUNT));
        final List<String> masterNodes = Collections.synchronizedList(new ArrayList<>(MAX_COUNT));
        final StateMachineCallback callback = new StateMachineCallback() {

            @Override
            public void onAsRoleMaster(StateMachineContext context) {
                Integer epochId = context.epoch();
                String node = context.node();
                logRecords.add(new LogEntry(epochId, node, LogEntry.Role.master));
                if (logRecords.size() > MAX_COUNT) {
                    context.stateMachine().shutdown();
                }
                Utils.println("master node: " + node);
                masterNodes.add(node);
            }

            @Override
            public void onAsRoleWorker(StateMachineContext context) {
                Integer epochId = context.epoch();
                String node = context.node();
                logRecords.add(new LogEntry(epochId, node, LogEntry.Role.worker));
                if (logRecords.size() > MAX_COUNT) {
                    context.stateMachine().shutdown();
                }
            }

            @Override
            public void onAsRoleCandidate(StateMachineContext context) {
                Integer epochId = context.epoch();
                String node = context.node();
                logRecords.add(new LogEntry(epochId, node, LogEntry.Role.candidate));
                if (logRecords.size() > MAX_COUNT) {
                    context.stateMachine().shutdown();
                }
            }

            @Override
            public void unknown(StateMachineContext context) {
                Integer epochId = context.epoch();
                String node = context.node();
                logRecords.add(new LogEntry(epochId, node, LogEntry.Role.unknown));
                if (logRecords.size() > MAX_COUNT) {
                    context.stateMachine().shutdown();
                }
            }

            @Override
            public void onAsRoleAbdication(StateMachineContext context) {
                Integer epochId = context.epoch();
                String node = context.node();
                logRecords.add(new LogEntry(epochId, node, LogEntry.Role.abdication));
                if (logRecords.size() > MAX_COUNT) {
                    context.stateMachine().shutdown();
                }
            }

            @Override
            public void error(StateMachineContext context, Throwable e) {
                Utils.println("state machine error: node " +
                              context.node() +
                              " message " + e.getMessage());
            }
        };

        final List<ClusterRole> clusterRoleLogs = Collections.synchronizedList(new ArrayList<>(100));
        final ClusterRoleStore clusterRoleStore = new ClusterRoleStore() {

            volatile int epoch = 0;

            final Map<Integer, ClusterRole> data = new ConcurrentHashMap<>();

            ClusterRole copy(ClusterRole clusterRole) {
                if (clusterRole == null) {
                    return null;
                }
                return new ClusterRole(clusterRole.node(), clusterRole.url(),
                                       clusterRole.epoch(), clusterRole.clock());
            }

            @Override
            public boolean updateIfNodePresent(ClusterRole clusterRole) {
                if (clusterRole.epoch() < this.epoch) {
                    return false;
                }

                ClusterRole copy = this.copy(clusterRole);
                ClusterRole newClusterRole = data.compute(copy.epoch(), (key, value) -> {
                    if (copy.epoch() > this.epoch) {
                        this.epoch = copy.epoch();
                        Assert.assertNull(value);
                        clusterRoleLogs.add(copy);
                        Utils.println("The node " + copy + " become new master:");
                        return copy;
                    }

                    Assert.assertEquals(value.epoch(), copy.epoch());
                    if (Objects.equals(value.node(), copy.node()) &&
                        value.clock() <= copy.clock()) {
                        Utils.println("The master node " + copy + " keep heartbeat");
                        clusterRoleLogs.add(copy);
                        if (value.clock() == copy.clock()) {
                            Assert.fail("Clock must increase when same epoch and node id");
                        }
                        return copy;
                    }
                    return value;

                });
                return Objects.equals(newClusterRole, copy);
            }

            @Override
            public Optional<ClusterRole> query() {
                return Optional.ofNullable(this.copy(this.data.get(this.epoch)));
            }
        };

        RoleElectionStateMachine[] machines = new RoleElectionStateMachine[4];
        Thread node1 = new Thread(() -> {
            Config config = new TestConfig("1");
            RoleElectionStateMachine stateMachine =
                                     new StandardRoleElectionStateMachine(config, clusterRoleStore);
            machines[1] = stateMachine;
            stateMachine.apply(callback);
            stop.countDown();
        });

        Thread node2 = new Thread(() -> {
            Config config = new TestConfig("2");
            RoleElectionStateMachine stateMachine =
                                     new StandardRoleElectionStateMachine(config, clusterRoleStore);
            machines[2] = stateMachine;
            stateMachine.apply(callback);
            stop.countDown();
        });

        Thread node3 = new Thread(() -> {
            Config config = new TestConfig("3");
            RoleElectionStateMachine stateMachine =
                                     new StandardRoleElectionStateMachine(config, clusterRoleStore);
            machines[3] = stateMachine;
            stateMachine.apply(callback);
            stop.countDown();
        });

        node1.start();
        node2.start();
        node3.start();

        Thread randomShutdown = new Thread(() -> {
            Set<String> dropNodes = new HashSet<>();
            while (dropNodes.size() < 3) {
                LockSupport.parkNanos(5_000_000_000L);
                int size = masterNodes.size();
                if (size < 1) {
                    continue;
                }
                String node = masterNodes.get(size - 1);
                if (dropNodes.contains(node)) {
                    continue;
                }
                machines[Integer.parseInt(node)].shutdown();
                dropNodes.add(node);
                Utils.println("----shutdown machine " + node);
            }
            stop.countDown();
        });

        randomShutdown.start();
        stop.await();

        Assert.assertGt(0, logRecords.size());
        Map<Integer, String> masters = new HashMap<>();
        for (LogEntry entry : logRecords) {
            if (entry.role == LogEntry.Role.master) {
                String lastNode = masters.putIfAbsent(entry.epoch, entry.node);
                if (lastNode != null) {
                    Assert.assertEquals(lastNode, entry.node);
                }
            }
        }

        Assert.assertGt(0, masters.size());
    }