func()

in store/engine/raft/node.go [252:305]


func (n *Node) runRaftMessages() error {
	n.wg.Add(1)
	go func() {
		ticker := time.NewTicker(time.Second)
		defer func() {
			ticker.Stop()
			n.wg.Done()
		}()

		for {
			select {
			case <-ticker.C:
				n.raftNode.Tick()
			case rd := <-n.raftNode.Ready():
				// Save to wal and storage first
				if !raft.IsEmptySnap(rd.Snapshot) {
					if err := n.dataStore.saveSnapshot(rd.Snapshot); err != nil {
						n.logger.Error("Failed to save snapshot", zap.Error(err))
					}
				}
				if err := n.dataStore.wal.Save(rd.HardState, rd.Entries); err != nil {
					n.logger.Error("Failed to save to wal", zap.Error(err))
				}

				// Replay the entries into the raft storage
				if err := n.applySnapshot(rd.Snapshot); err != nil {
					n.logger.Error("Failed to apply snapshot", zap.Error(err))
				}
				_ = n.dataStore.raftStorage.Append(rd.Entries)

				for _, msg := range rd.Messages {
					if msg.Type == raftpb.MsgApp {
						msg.Snapshot.Metadata.ConfState = n.confState
					}
				}
				n.transport.Send(rd.Messages)

				// Apply the committed entries to the state machine
				n.applyEntries(rd.CommittedEntries)
				if err := n.triggerSnapshotIfNeed(); err != nil {
					n.logger.Error("Failed to trigger snapshot", zap.Error(err))
				}
				n.raftNode.Advance()
			case err := <-n.transport.ErrorC:
				n.logger.Fatal("Found transport error", zap.Error(err))
				return
			case <-n.shutdown:
				n.logger.Info("Shutting down raft node")
				return
			}
		}
	}()
	return nil
}