in store/engine/raft/node.go [252:305]
func (n *Node) runRaftMessages() error {
n.wg.Add(1)
go func() {
ticker := time.NewTicker(time.Second)
defer func() {
ticker.Stop()
n.wg.Done()
}()
for {
select {
case <-ticker.C:
n.raftNode.Tick()
case rd := <-n.raftNode.Ready():
// Save to wal and storage first
if !raft.IsEmptySnap(rd.Snapshot) {
if err := n.dataStore.saveSnapshot(rd.Snapshot); err != nil {
n.logger.Error("Failed to save snapshot", zap.Error(err))
}
}
if err := n.dataStore.wal.Save(rd.HardState, rd.Entries); err != nil {
n.logger.Error("Failed to save to wal", zap.Error(err))
}
// Replay the entries into the raft storage
if err := n.applySnapshot(rd.Snapshot); err != nil {
n.logger.Error("Failed to apply snapshot", zap.Error(err))
}
_ = n.dataStore.raftStorage.Append(rd.Entries)
for _, msg := range rd.Messages {
if msg.Type == raftpb.MsgApp {
msg.Snapshot.Metadata.ConfState = n.confState
}
}
n.transport.Send(rd.Messages)
// Apply the committed entries to the state machine
n.applyEntries(rd.CommittedEntries)
if err := n.triggerSnapshotIfNeed(); err != nil {
n.logger.Error("Failed to trigger snapshot", zap.Error(err))
}
n.raftNode.Advance()
case err := <-n.transport.ErrorC:
n.logger.Fatal("Found transport error", zap.Error(err))
return
case <-n.shutdown:
n.logger.Info("Shutting down raft node")
return
}
}
}()
return nil
}