utils/synclog/sync_log.go (151 lines of code) (raw):

package synclog import ( "bytes" "encoding/binary" "os" "sync" "time" "github.com/pkg/errors" ) var ( logFileSize = 2 << 30 // 2G ) type ReadFunc func([]byte) error type SyncLog struct { dirPath string metaInfo *MetaInfo logInfo *LogInfo readCh chan *ReadRequest readFunc ReadFunc closeCh chan struct{} } func NewSyncLog(dirPath string, f ReadFunc) *SyncLog { _, err := os.Stat(dirPath) if os.IsNotExist(err) { os.MkdirAll(dirPath, 0744) } return &SyncLog{ dirPath: dirPath, readCh: make(chan *ReadRequest, 10000), closeCh: make(chan struct{}, 1), readFunc: f, } } func (l *SyncLog) Init() error { info, err := OpenMetaFile(l.dirPath) if err != nil { return err } l.metaInfo = info logInfo, err := OpenLogFile(l.dirPath, info, l) if err != nil { return err } l.logInfo = logInfo if err := syncDir(l.dirPath); err != nil { return err } go l.loopRead() return nil } // Write write loger to sync log func (l *SyncLog) Write(loger SyncLoger) error { playData := loger.Format() buf := bufferPool.Get().(*bytes.Buffer) buf.Reset() binary.Write(buf, binary.LittleEndian, int32(len(playData))) buf.Write(playData) if err := l.logInfo.Write(buf.Bytes()); err != nil { return err } bufferPool.Put(buf) return nil } func (l *SyncLog) loopRead() { retryTimes := 10 for { select { case req := <-l.readCh: for i := 0; i < retryTimes; i++ { if err := l.readFunc(req.PlayLoad); err == nil { break } time.Sleep(time.Second) } l.metaInfo.SaveOffset(req.Offset) case <-l.closeCh: return } } } func (l *SyncLog) Close() error { close(l.closeCh) if err := l.metaInfo.Close(); err != nil { return err } if err := l.logInfo.Close(); err != nil { return err } return nil } type SyncLoger interface { Format() []byte Parse([]byte) error } type SyncLogKVItem struct { Key []byte Value []byte } var bufferPool sync.Pool func init() { bufferPool = sync.Pool{ New: func() any { return bytes.NewBuffer(nil) }, } } func (kv *SyncLogKVItem) Format() []byte { buf := bufferPool.Get().(*bytes.Buffer) buf.Reset() defer bufferPool.Put(buf) binary.Write(buf, binary.LittleEndian, int32(len(kv.Key))) buf.Write(kv.Key) binary.Write(buf, binary.LittleEndian, int32(len(kv.Value))) buf.Write(kv.Value) return buf.Bytes() } func (kv *SyncLogKVItem) Parse(data []byte) error { reader := bytes.NewReader(data) var keySize int32 if err := binary.Read(reader, binary.LittleEndian, &keySize); err != nil { return err } kv.Key = make([]byte, keySize) if _, err := reader.Read(kv.Key); err != nil { return err } var valueSize int32 if err := binary.Read(reader, binary.LittleEndian, &valueSize); err != nil { return err } kv.Value = make([]byte, valueSize) if _, err := reader.Read(kv.Value); err != nil { return err } return nil } type ReadRequest struct { Offset uint32 PlayLoad []byte } // When you create or delete a file, you have to ensure the directory entry for the file is synced // in order to guarantee the file is visible (if the system crashes). (See the man page for fsync, // or see https://github.com/coreos/etcd/issues/6368 for an example.) func syncDir(dir string) error { f, err := os.Open(dir) if err != nil { return errors.Wrapf(err, "While opening directory: %s.", dir) } err = f.Sync() closeErr := f.Close() if err != nil { return errors.Wrapf(err, "While syncing directory: %s.", dir) } return errors.Wrapf(closeErr, "While closing directory: %s.", dir) }