public Status changePeers()

in hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/PartitionEngine.java [301:432]


    public Status changePeers(List<String> peers, final Closure done) {
        if (ListUtils.isEqualList(peers, RaftUtils.getPeerEndpoints(raftNode))) {
            return Status.OK();
        }

        Status result = HgRaftError.TASK_CONTINUE.toStatus();
        List<String> oldPeers = RaftUtils.getAllEndpoints(raftNode);
        log.info("Raft {} changePeers start, old peer is {}, new peer is {}",
                 getGroupId(), oldPeers, peers);
        // Check the peer that needs to be added.
        List<String> addPeers = ListUtils.removeAll(peers, oldPeers);
        // learner to be deleted. Possible peer change.
        List<String> removedPeers = ListUtils.removeAll(RaftUtils.getLearnerEndpoints(raftNode),
                                                        peers);

        HgCmdClient rpcClient = storeEngine.getHgCmdClient();
        // Generate a new Configuration object
        Configuration oldConf = getCurrentConf();
        Configuration conf = oldConf.copy();
        if (!addPeers.isEmpty()) {
            addPeers.forEach(peer -> {
                conf.addLearner(JRaftUtils.getPeerId(peer));
            });

            doSnapshot((RaftClosure) status -> {
                log.info("Raft {} snapshot before add learner, result:{}", getGroupId(), status);
            });

            FutureClosure closure = new FutureClosure(addPeers.size());
            addPeers.forEach(peer -> Utils.runInThread(() -> {
                // 1. Create a new peer's raft object
                rpcClient.createRaftNode(peer, partitionManager.getPartitionList(getGroupId()),
                                         conf, status -> {
                            closure.run(status);
                            if (!status.isOk()) {
                                log.error("Raft {} add node {} error {}",
                                          options.getGroupId(), peer, status);
                            }
                        });
            }));
            closure.get();
        } else {
            // 3. Check if learner has completed snapshot synchronization
            boolean snapshotOk = true;
            for (PeerId peerId : raftNode.listLearners()) {
                Replicator.State state = getReplicatorState(peerId);
                if (state == null || state != Replicator.State.Replicate) {
                    snapshotOk = false;
                    break;
                }
                log.info("Raft {} {} getReplicatorState {}", getGroupId(), peerId, state);
            }
            if (snapshotOk && !conf.listLearners().isEmpty()) {
                // 4. Delete learner, rejoin as peer
                FutureClosure closure = new FutureClosure();
                raftNode.removeLearners(conf.listLearners(), closure);
                if (closure.get().isOk()) {
                    conf.listLearners().forEach(peerId -> {
                        conf.addPeer(peerId);
                        conf.removeLearner(peerId);
                    });
                    result = Status.OK();
                } else {
                    // Failed, retrying
                    result = HgRaftError.TASK_ERROR.toStatus();
                }
            } else if (snapshotOk) {
                result = Status.OK();   // No learner, indicating only delete operations are performed.
            }
        }
        if (result.isOk()) {
            // Sync completed, delete old peer
            removedPeers.addAll(ListUtils.removeAll(oldPeers, peers));
            // Check if leader is deleted, if so, perform leader migration first.
            if (removedPeers.contains(
                    this.getRaftNode().getNodeId().getPeerId().getEndpoint().toString())) {

                log.info("Raft {} leader is removed, needs to transfer leader {}, conf: {}",
                         getGroupId(), peers, conf);
                // only one (that's leader self), should add peer first
                if (raftNode.listPeers().size() == 1) {
                    FutureClosure closure = new FutureClosure();
                    raftNode.changePeers(conf, closure);
                    log.info("Raft {} change peer result:{}", getGroupId(), closure.get());
                }

                var status = this.raftNode.transferLeadershipTo(PeerId.ANY_PEER);
                log.info("Raft {} transfer leader status : {}", getGroupId(), status);
                // Need to resend the command to the new leader
                return HgRaftError.TASK_ERROR.toStatus();
            }
        }

        if (!removedPeers.isEmpty()) {
            removedPeers.forEach(peer -> {
                conf.removeLearner(JRaftUtils.getPeerId(peer));
                conf.removePeer(JRaftUtils.getPeerId(peer));
            });
        }

        if (!RaftUtils.configurationEquals(oldConf, conf)) {
            // 2. The new peer joins as a learner.
            // 5. peer switching, add new peer, delete old peer
            FutureClosure closure = new FutureClosure();
            raftNode.changePeers(conf, closure);
            if (closure.get().isOk()) {
                if (!removedPeers.isEmpty()) {
                    removedPeers.forEach(peer -> Utils.runInThread(() -> {
                        // 6. Stop the deleted peer
                        rpcClient.destroyRaftNode(peer,
                                                  partitionManager.getPartitionList(getGroupId()),
                                                  status -> {
                                                      if (!status.isOk()) {
                                                          // TODO: What if it fails?
                                                          log.error("Raft {} destroy node {}" +
                                                                    " error {}",
                                                                    options.getGroupId(), peer,
                                                                    status);
                                                      }
                                                  });
                    }));
                }
            } else {
                // Failed, retrying
                result = HgRaftError.TASK_ERROR.toStatus();
            }
            log.info("Raft {} changePeers result {}, conf is {}",
                     getRaftNode().getGroupId(), closure.get(), conf);
        }
        log.info("Raft {} changePeers end. {}, result is {}", getGroupId(), peers, result);
        return result;
    }