in hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/RoleElectionStateMachineTest.java [142:329]
public void testStateMachine() throws InterruptedException {
final int MAX_COUNT = 200;
CountDownLatch stop = new CountDownLatch(4);
List<LogEntry> logRecords = Collections.synchronizedList(new ArrayList<>(MAX_COUNT));
List<String> masterNodes = Collections.synchronizedList(new ArrayList<>(MAX_COUNT));
RoleListener callback = new RoleListener() {
@Override
public void onAsRoleMaster(StateMachineContext context) {
Integer epochId = context.epoch();
String node = context.node();
logRecords.add(new LogEntry(epochId, node, LogEntry.Role.master));
if (logRecords.size() > MAX_COUNT) {
context.stateMachine().shutdown();
}
Utils.println("master node: " + node);
masterNodes.add(node);
}
@Override
public void onAsRoleWorker(StateMachineContext context) {
Integer epochId = context.epoch();
String node = context.node();
logRecords.add(new LogEntry(epochId, node, LogEntry.Role.worker));
if (logRecords.size() > MAX_COUNT) {
context.stateMachine().shutdown();
}
}
@Override
public void onAsRoleCandidate(StateMachineContext context) {
Integer epochId = context.epoch();
String node = context.node();
logRecords.add(new LogEntry(epochId, node, LogEntry.Role.candidate));
if (logRecords.size() > MAX_COUNT) {
context.stateMachine().shutdown();
}
}
@Override
public void unknown(StateMachineContext context) {
Integer epochId = context.epoch();
String node = context.node();
logRecords.add(new LogEntry(epochId, node, LogEntry.Role.unknown));
if (logRecords.size() > MAX_COUNT) {
context.stateMachine().shutdown();
}
}
@Override
public void onAsRoleAbdication(StateMachineContext context) {
Integer epochId = context.epoch();
String node = context.node();
logRecords.add(new LogEntry(epochId, node, LogEntry.Role.abdication));
if (logRecords.size() > MAX_COUNT) {
context.stateMachine().shutdown();
}
}
@Override
public void error(StateMachineContext context, Throwable e) {
Utils.println("state machine error: node " +
context.node() + " message " + e.getMessage());
}
};
final List<ClusterRole> clusterRoleLogs = Collections.synchronizedList(
new ArrayList<>(100));
final ClusterRoleStore clusterRoleStore = new ClusterRoleStore() {
volatile int epoch = 0;
final Map<Integer, ClusterRole> data = new ConcurrentHashMap<>();
ClusterRole copy(ClusterRole clusterRole) {
if (clusterRole == null) {
return null;
}
return new ClusterRole(clusterRole.node(), clusterRole.url(),
clusterRole.epoch(), clusterRole.clock());
}
@Override
public boolean updateIfNodePresent(ClusterRole clusterRole) {
if (clusterRole.epoch() < this.epoch) {
return false;
}
ClusterRole copy = this.copy(clusterRole);
ClusterRole newClusterRole = this.data.compute(copy.epoch(), (key, value) -> {
if (copy.epoch() > this.epoch) {
this.epoch = copy.epoch();
Assert.assertNull(value);
clusterRoleLogs.add(copy);
Utils.println("The node " + copy + " become new master:");
return copy;
}
Assert.assertEquals(value.epoch(), copy.epoch());
if (Objects.equals(value.node(), copy.node()) &&
value.clock() <= copy.clock()) {
Utils.println("The master node " + copy + " keep heartbeat");
clusterRoleLogs.add(copy);
if (value.clock() == copy.clock()) {
Assert.fail("Clock must increase when same epoch and node id");
}
return copy;
}
return value;
});
return Objects.equals(newClusterRole, copy);
}
@Override
public Optional<ClusterRole> query() {
return Optional.ofNullable(this.copy(this.data.get(this.epoch)));
}
};
RoleElectionStateMachine[] machines = new RoleElectionStateMachine[4];
Thread node1 = new Thread(() -> {
Config config = new TestConfig("1");
RoleElectionStateMachine stateMachine =
new StandardRoleElectionStateMachine(config, clusterRoleStore);
machines[1] = stateMachine;
stateMachine.start(callback);
stop.countDown();
});
Thread node2 = new Thread(() -> {
Config config = new TestConfig("2");
RoleElectionStateMachine stateMachine =
new StandardRoleElectionStateMachine(config, clusterRoleStore);
machines[2] = stateMachine;
stateMachine.start(callback);
stop.countDown();
});
Thread node3 = new Thread(() -> {
Config config = new TestConfig("3");
RoleElectionStateMachine stateMachine =
new StandardRoleElectionStateMachine(config, clusterRoleStore);
machines[3] = stateMachine;
stateMachine.start(callback);
stop.countDown();
});
node1.start();
node2.start();
node3.start();
Thread randomShutdown = new Thread(() -> {
Set<String> dropNodes = new HashSet<>();
while (dropNodes.size() < 3) {
LockSupport.parkNanos(5_000_000_000L);
int size = masterNodes.size();
if (size < 1) {
continue;
}
String node = masterNodes.get(size - 1);
if (dropNodes.contains(node)) {
continue;
}
machines[Integer.parseInt(node)].shutdown();
dropNodes.add(node);
Utils.println("----shutdown machine " + node);
}
stop.countDown();
});
randomShutdown.start();
stop.await();
Assert.assertGt(0, logRecords.size());
Map<Integer, String> masters = new HashMap<>();
for (LogEntry entry : logRecords) {
if (entry.role == LogEntry.Role.master) {
String lastNode = masters.putIfAbsent(entry.epoch, entry.node);
if (lastNode != null) {
Assert.assertEquals(lastNode, entry.node);
}
}
}
Assert.assertGt(0, masters.size());
}