utils/synclog/log_info.go (139 lines of code) (raw):

package synclog import ( "bytes" "encoding/binary" "errors" "fmt" "io" "os" "sync" "time" "github.com/alibaba/pairec/v2/log" unix "golang.org/x/sys/unix" ) type LogInfo struct { logFile *os.File logFileData []byte metaInfo *MetaInfo syncLog *SyncLog mu sync.Mutex size int } func OpenLogFile(dirPath string, metaInfo *MetaInfo, syncLog *SyncLog) (*LogInfo, error) { logFilePath := fmt.Sprintf("%s/log_sync.data", dirPath) logInfo := &LogInfo{metaInfo: metaInfo, size: 0, syncLog: syncLog} logFileInfo, err := os.Stat(logFilePath) if os.IsNotExist(err) { logFile, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0644) if err != nil { return nil, fmt.Errorf("create log file %s: %v fail", logFilePath, err) } logData, err := unix.Mmap(int(logFile.Fd()), 0, logFileSize*2, unix.PROT_WRITE, unix.MAP_SHARED) if err != nil { return nil, err } logInfo.logFileData = logData logInfo.logFile = logFile logInfo.logFile.Seek(0, io.SeekStart) } else if err != nil { return nil, fmt.Errorf("can't stat log file %s: %v", logFilePath, err) } else { logFile, err := os.OpenFile(logFilePath, os.O_RDWR, 0644) if err != nil { return nil, fmt.Errorf("open log file %s: %v fail", logFilePath, err) } logData, err := unix.Mmap(int(logFile.Fd()), 0, logFileSize*2, unix.PROT_WRITE, unix.MAP_SHARED) if err != nil { return nil, err } logInfo.logFileData = logData logInfo.logFile = logFile size := logFileInfo.Size() log.Info(fmt.Sprintf("open log file, dirPath:%s, meta offset:%d, log file size:%d", dirPath, metaInfo.meta.Offset, size)) if size < int64(metaInfo.meta.Offset) { log.Error(fmt.Sprintf("open log file meta error, dirPath:%s, meta offset:%d, log file size:%d", dirPath, metaInfo.meta.Offset, size)) metaInfo.meta.Offset = 0 } buf := make([]byte, size-int64(metaInfo.meta.Offset)) copy(buf, logInfo.logFileData[metaInfo.meta.Offset:size]) //logInfo.logFileData = logInfo.logFileData[:0] copy(logInfo.logFileData, buf) logInfo.size = len(buf) logInfo.logFile.Seek(int64(logInfo.size), io.SeekStart) unix.Ftruncate(int(logFile.Fd()), int64(logInfo.size)) metaInfo.SaveOffset(0) if len(buf) > 0 { fmt.Printf("reply data size:%d\n", len(buf)) if err := logInfo.replyRead(buf); err != nil { return nil, err } } } return logInfo, nil } func (l *LogInfo) replyRead(data []byte) error { reader := bytes.NewReader(data) offset := int32(0) for { size := int32(-1) err := binary.Read(reader, binary.LittleEndian, &size) if err == io.EOF { break } if err != nil { return err } buf := make([]byte, size) n, err := reader.Read(buf) if err != nil { return err } if n != int(size) { return fmt.Errorf("read data fail") } offset += 4 + size req := &ReadRequest{ Offset: uint32(offset), PlayLoad: buf, } l.syncLog.readCh <- req } return nil } func (l *LogInfo) Write(data []byte) error { l.mu.Lock() defer l.mu.Unlock() n, err := l.logFile.Write(data) if err != nil { return err } l.size += n req := &ReadRequest{} req.Offset = uint32(l.size) req.PlayLoad = make([]byte, len(data)-4) copy(req.PlayLoad, data[4:]) select { case l.syncLog.readCh <- req: default: return errors.New("read chan is full") } if l.size >= logFileSize { for { if l.size == int(l.metaInfo.meta.Offset) { break } time.Sleep(time.Millisecond * 100) } fmt.Println("set file to start write") unix.Ftruncate(int(l.logFile.Fd()), 0) l.logFile.Seek(0, io.SeekStart) l.metaInfo.SaveOffset(0) l.size = 0 } return nil } func (l *LogInfo) Close() error { if err := unix.Munmap(l.logFileData); err != nil { return err } return l.logFile.Close() }