func()

in store/engine/raft/node.go [471:509]


func (n *Node) applyEntry(entry raftpb.Entry) error {
	switch entry.Type {
	case raftpb.EntryNormal:
		return n.dataStore.applyDataEntry(entry)
	case raftpb.EntryConfChangeV2, raftpb.EntryConfChange:
		// apply config change to the state machine
		var cc raftpb.ConfChange
		if err := cc.Unmarshal(entry.Data); err != nil {
			return err
		}

		n.confState = *n.raftNode.ApplyConfChange(cc)
		switch cc.Type {
		case raftpb.ConfChangeAddNode:
			if cc.NodeID != n.config.ID && len(cc.Context) > 0 {
				n.logger.Info("Add the new peer", zap.String("context", string(cc.Context)))
				n.transport.AddPeer(types.ID(cc.NodeID), []string{string(cc.Context)})
				n.peers.Store(cc.NodeID, string(cc.Context))
			}
		case raftpb.ConfChangeRemoveNode:
			if cc.NodeID == n.config.ID {
				n.logger.Info("I have been removed from the cluster, will shutdown")
				n.Close()
				return nil
			}
			n.transport.RemovePeer(types.ID(cc.NodeID))
			n.peers.Delete(cc.NodeID)
			n.logger.Info("Remove the peer", zap.Uint64("node_id", cc.NodeID))
		case raftpb.ConfChangeUpdateNode:
			n.transport.UpdatePeer(types.ID(cc.NodeID), []string{string(cc.Context)})
			if _, ok := n.peers.Load(cc.NodeID); ok {
				n.peers.Store(cc.NodeID, string(cc.Context))
			}
		case raftpb.ConfChangeAddLearnerNode:
			// TODO: add the learner node
		}
	}
	return nil
}