in store/engine/raft/node.go [131:176]
func (n *Node) run() error {
// The node is already running
if !n.isRunning.CompareAndSwap(false, true) {
return nil
}
n.shutdown = make(chan struct{})
peers := make([]raft.Peer, len(n.config.Peers))
for i, peer := range n.config.Peers {
peers[i] = raft.Peer{
ID: uint64(i + 1),
Context: []byte(peer),
}
}
raftConfig := &raft.Config{
ID: n.config.ID,
HeartbeatTick: n.config.HeartbeatSeconds,
ElectionTick: n.config.ElectionSeconds,
MaxInflightMsgs: 128,
MaxSizePerMsg: 10 * 1024 * 1024, // 10 MiB
Storage: n.dataStore.raftStorage,
Logger: Logger{SugaredLogger: n.logger.Sugar()},
}
// WAL existing check must be done before replayWAL since it will create a new WAL if not exists
walExists := n.dataStore.walExists()
snapshot, err := n.dataStore.replayWAL()
if err != nil {
return err
}
n.appliedIndex = snapshot.Metadata.Index
n.snapshotIndex = snapshot.Metadata.Index
n.confState = snapshot.Metadata.ConfState
if n.config.ClusterState == ClusterStateExisting || walExists {
n.raftNode = raft.RestartNode(raftConfig)
} else {
n.raftNode = raft.StartNode(raftConfig, peers)
}
if err := n.runTransport(); err != nil {
return err
}
n.watchLeaderChange()
return n.runRaftMessages()
}