in store/engine/raft/node.go [471:509]
func (n *Node) applyEntry(entry raftpb.Entry) error {
switch entry.Type {
case raftpb.EntryNormal:
return n.dataStore.applyDataEntry(entry)
case raftpb.EntryConfChangeV2, raftpb.EntryConfChange:
// apply config change to the state machine
var cc raftpb.ConfChange
if err := cc.Unmarshal(entry.Data); err != nil {
return err
}
n.confState = *n.raftNode.ApplyConfChange(cc)
switch cc.Type {
case raftpb.ConfChangeAddNode:
if cc.NodeID != n.config.ID && len(cc.Context) > 0 {
n.logger.Info("Add the new peer", zap.String("context", string(cc.Context)))
n.transport.AddPeer(types.ID(cc.NodeID), []string{string(cc.Context)})
n.peers.Store(cc.NodeID, string(cc.Context))
}
case raftpb.ConfChangeRemoveNode:
if cc.NodeID == n.config.ID {
n.logger.Info("I have been removed from the cluster, will shutdown")
n.Close()
return nil
}
n.transport.RemovePeer(types.ID(cc.NodeID))
n.peers.Delete(cc.NodeID)
n.logger.Info("Remove the peer", zap.Uint64("node_id", cc.NodeID))
case raftpb.ConfChangeUpdateNode:
n.transport.UpdatePeer(types.ID(cc.NodeID), []string{string(cc.Context)})
if _, ok := n.peers.Load(cc.NodeID); ok {
n.peers.Store(cc.NodeID, string(cc.Context))
}
case raftpb.ConfChangeAddLearnerNode:
// TODO: add the learner node
}
}
return nil
}