func()

in store/engine/raft/node.go [131:176]


func (n *Node) run() error {
	// The node is already running
	if !n.isRunning.CompareAndSwap(false, true) {
		return nil
	}
	n.shutdown = make(chan struct{})

	peers := make([]raft.Peer, len(n.config.Peers))
	for i, peer := range n.config.Peers {
		peers[i] = raft.Peer{
			ID:      uint64(i + 1),
			Context: []byte(peer),
		}
	}
	raftConfig := &raft.Config{
		ID:              n.config.ID,
		HeartbeatTick:   n.config.HeartbeatSeconds,
		ElectionTick:    n.config.ElectionSeconds,
		MaxInflightMsgs: 128,
		MaxSizePerMsg:   10 * 1024 * 1024, // 10 MiB
		Storage:         n.dataStore.raftStorage,
		Logger:          Logger{SugaredLogger: n.logger.Sugar()},
	}

	// WAL existing check must be done before replayWAL since it will create a new WAL if not exists
	walExists := n.dataStore.walExists()
	snapshot, err := n.dataStore.replayWAL()
	if err != nil {
		return err
	}
	n.appliedIndex = snapshot.Metadata.Index
	n.snapshotIndex = snapshot.Metadata.Index
	n.confState = snapshot.Metadata.ConfState

	if n.config.ClusterState == ClusterStateExisting || walExists {
		n.raftNode = raft.RestartNode(raftConfig)
	} else {
		n.raftNode = raft.StartNode(raftConfig, peers)
	}

	if err := n.runTransport(); err != nil {
		return err
	}
	n.watchLeaderChange()
	return n.runRaftMessages()
}