in hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/PartitionEngine.java [301:432]
public Status changePeers(List<String> peers, final Closure done) {
if (ListUtils.isEqualList(peers, RaftUtils.getPeerEndpoints(raftNode))) {
return Status.OK();
}
Status result = HgRaftError.TASK_CONTINUE.toStatus();
List<String> oldPeers = RaftUtils.getAllEndpoints(raftNode);
log.info("Raft {} changePeers start, old peer is {}, new peer is {}",
getGroupId(), oldPeers, peers);
// Check the peer that needs to be added.
List<String> addPeers = ListUtils.removeAll(peers, oldPeers);
// learner to be deleted. Possible peer change.
List<String> removedPeers = ListUtils.removeAll(RaftUtils.getLearnerEndpoints(raftNode),
peers);
HgCmdClient rpcClient = storeEngine.getHgCmdClient();
// Generate a new Configuration object
Configuration oldConf = getCurrentConf();
Configuration conf = oldConf.copy();
if (!addPeers.isEmpty()) {
addPeers.forEach(peer -> {
conf.addLearner(JRaftUtils.getPeerId(peer));
});
doSnapshot((RaftClosure) status -> {
log.info("Raft {} snapshot before add learner, result:{}", getGroupId(), status);
});
FutureClosure closure = new FutureClosure(addPeers.size());
addPeers.forEach(peer -> Utils.runInThread(() -> {
// 1. Create a new peer's raft object
rpcClient.createRaftNode(peer, partitionManager.getPartitionList(getGroupId()),
conf, status -> {
closure.run(status);
if (!status.isOk()) {
log.error("Raft {} add node {} error {}",
options.getGroupId(), peer, status);
}
});
}));
closure.get();
} else {
// 3. Check if learner has completed snapshot synchronization
boolean snapshotOk = true;
for (PeerId peerId : raftNode.listLearners()) {
Replicator.State state = getReplicatorState(peerId);
if (state == null || state != Replicator.State.Replicate) {
snapshotOk = false;
break;
}
log.info("Raft {} {} getReplicatorState {}", getGroupId(), peerId, state);
}
if (snapshotOk && !conf.listLearners().isEmpty()) {
// 4. Delete learner, rejoin as peer
FutureClosure closure = new FutureClosure();
raftNode.removeLearners(conf.listLearners(), closure);
if (closure.get().isOk()) {
conf.listLearners().forEach(peerId -> {
conf.addPeer(peerId);
conf.removeLearner(peerId);
});
result = Status.OK();
} else {
// Failed, retrying
result = HgRaftError.TASK_ERROR.toStatus();
}
} else if (snapshotOk) {
result = Status.OK(); // No learner, indicating only delete operations are performed.
}
}
if (result.isOk()) {
// Sync completed, delete old peer
removedPeers.addAll(ListUtils.removeAll(oldPeers, peers));
// Check if leader is deleted, if so, perform leader migration first.
if (removedPeers.contains(
this.getRaftNode().getNodeId().getPeerId().getEndpoint().toString())) {
log.info("Raft {} leader is removed, needs to transfer leader {}, conf: {}",
getGroupId(), peers, conf);
// only one (that's leader self), should add peer first
if (raftNode.listPeers().size() == 1) {
FutureClosure closure = new FutureClosure();
raftNode.changePeers(conf, closure);
log.info("Raft {} change peer result:{}", getGroupId(), closure.get());
}
var status = this.raftNode.transferLeadershipTo(PeerId.ANY_PEER);
log.info("Raft {} transfer leader status : {}", getGroupId(), status);
// Need to resend the command to the new leader
return HgRaftError.TASK_ERROR.toStatus();
}
}
if (!removedPeers.isEmpty()) {
removedPeers.forEach(peer -> {
conf.removeLearner(JRaftUtils.getPeerId(peer));
conf.removePeer(JRaftUtils.getPeerId(peer));
});
}
if (!RaftUtils.configurationEquals(oldConf, conf)) {
// 2. The new peer joins as a learner.
// 5. peer switching, add new peer, delete old peer
FutureClosure closure = new FutureClosure();
raftNode.changePeers(conf, closure);
if (closure.get().isOk()) {
if (!removedPeers.isEmpty()) {
removedPeers.forEach(peer -> Utils.runInThread(() -> {
// 6. Stop the deleted peer
rpcClient.destroyRaftNode(peer,
partitionManager.getPartitionList(getGroupId()),
status -> {
if (!status.isOk()) {
// TODO: What if it fails?
log.error("Raft {} destroy node {}" +
" error {}",
options.getGroupId(), peer,
status);
}
});
}));
}
} else {
// Failed, retrying
result = HgRaftError.TASK_ERROR.toStatus();
}
log.info("Raft {} changePeers result {}, conf is {}",
getRaftNode().getGroupId(), closure.get(), conf);
}
log.info("Raft {} changePeers end. {}, result is {}", getGroupId(), peers, result);
return result;
}