log/rollwriter/roll_writer.go (312 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one or more // contributor license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright ownership. // The ASF licenses this file to You under the Apache License, Version 2.0 // (the "License"); you may not use this file except in compliance with // the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Package rollwriter provides a high performance rolling file log. // It can coordinate with any logs which depends on io.Writer, such as golang standard log. // Main features: // 1. support rolling logs by file size. // 2. support rolling logs by datetime. // 3. support scavenging expired or useless logs. // 4. support compressing logs. package rollwriter import ( "compress/gzip" "errors" "fmt" "io" "io/ioutil" "os" "path/filepath" "sort" "strings" "sync" "sync/atomic" "time" "github.com/lestrrat-go/strftime" ) const ( backupTimeFormat = "bk-20060102-150405.00000" compressSuffix = ".gz" ) // ensure we always implement io.WriteCloser. var _ io.WriteCloser = (*RollWriter)(nil) // RollWriter is a file log writer which support rolling by size or datetime. // It implements io.WriteCloser. type RollWriter struct { filePath string opts *Options pattern *strftime.Strftime currDir string currPath string currSize int64 currFile atomic.Value openTime int64 mu sync.Mutex notifyOnce sync.Once notifyCh chan bool closeOnce sync.Once closeCh chan *os.File } // NewRollWriter creates a new RollWriter. func NewRollWriter(filePath string, opt ...Option) (*RollWriter, error) { opts := &Options{ MaxSize: 0, // default no rolling by file size MaxAge: 0, // default no scavenging on expired logs MaxBackups: 0, // default no scavenging on redundant logs Compress: false, // default no compressing } // opt has the highest priority and should overwrite the original one. for _, o := range opt { o(opts) } if filePath == "" { return nil, errors.New("invalid file path") } pattern, err := strftime.New(filePath + opts.TimeFormat) if err != nil { return nil, errors.New("invalid time pattern") } w := &RollWriter{ filePath: filePath, opts: opts, pattern: pattern, currDir: filepath.Dir(filePath), } if err := os.MkdirAll(w.currDir, 0755); err != nil { return nil, err } return w, nil } // Write writes logs. It implements io.Writer. func (w *RollWriter) Write(v []byte) (n int, err error) { // reopen file every 10 seconds. if w.getCurrFile() == nil || time.Now().Unix()-atomic.LoadInt64(&w.openTime) > 10 { w.mu.Lock() w.reopenFile() w.mu.Unlock() } // return when failed to open the file. if w.getCurrFile() == nil { return 0, errors.New("open file fail") } // write logs to file. n, err = w.getCurrFile().Write(v) atomic.AddInt64(&w.currSize, int64(n)) // rolling on full if w.opts.MaxSize > 0 && atomic.LoadInt64(&w.currSize) >= w.opts.MaxSize { w.mu.Lock() w.backupFile() w.mu.Unlock() } return n, err } // Close closes the current log file. It implements io.Closer. func (w *RollWriter) Close() error { if w.getCurrFile() == nil { return nil } err := w.getCurrFile().Close() w.setCurrFile(nil) if w.notifyCh != nil { close(w.notifyCh) w.notifyCh = nil } if w.closeCh != nil { close(w.closeCh) w.closeCh = nil } return err } // getCurrFile returns the current log file. func (w *RollWriter) getCurrFile() *os.File { if file, ok := w.currFile.Load().(*os.File); ok { return file } return nil } // setCurrFile sets the current log file. func (w *RollWriter) setCurrFile(file *os.File) { w.currFile.Store(file) } // reopenFile reopen the file regularly. It notifies the scavenger if file path has changed. func (w *RollWriter) reopenFile() { if w.getCurrFile() == nil || time.Now().Unix()-atomic.LoadInt64(&w.openTime) > 10 { atomic.StoreInt64(&w.openTime, time.Now().Unix()) currPath := w.pattern.FormatString(time.Now()) if w.currPath != currPath { w.currPath = currPath w.notify() } _ = w.doReopenFile(w.currPath) } } // doReopenFile reopen the file. func (w *RollWriter) doReopenFile(path string) error { atomic.StoreInt64(&w.openTime, time.Now().Unix()) lastFile := w.getCurrFile() of, err := os.OpenFile(path, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) if err == nil { w.setCurrFile(of) if lastFile != nil { // delay closing until not used. w.delayCloseFile(lastFile) } st, _ := os.Stat(path) if st != nil { atomic.StoreInt64(&w.currSize, st.Size()) } } return err } // backupFile backs this file up and reopen a new one if file size is too large. func (w *RollWriter) backupFile() { if w.opts.MaxSize > 0 && atomic.LoadInt64(&w.currSize) >= w.opts.MaxSize { atomic.StoreInt64(&w.currSize, 0) // rename the old file. newName := w.currPath + "." + time.Now().Format(backupTimeFormat) if _, e := os.Stat(w.currPath); !os.IsNotExist(e) { _ = os.Rename(w.currPath, newName) } // reopen a new one. _ = w.doReopenFile(w.currPath) w.notify() } } // notify runs scavengers. func (w *RollWriter) notify() { w.notifyOnce.Do(func() { w.notifyCh = make(chan bool, 1) go w.runCleanFiles() }) select { case w.notifyCh <- true: default: } } // runCleanFiles cleans redundant or expired (compressed) logs in a new goroutine. func (w *RollWriter) runCleanFiles() { for range w.notifyCh { if w.opts.MaxBackups == 0 && w.opts.MaxAge == 0 && !w.opts.Compress { continue } w.cleanFiles() } } // delayCloseFile delay closing file func (w *RollWriter) delayCloseFile(file *os.File) { w.closeOnce.Do(func() { w.closeCh = make(chan *os.File, 100) go w.runCloseFiles() }) w.closeCh <- file } // runCloseFiles delay closing file in a new goroutine. func (w *RollWriter) runCloseFiles() { for f := range w.closeCh { // delay 20ms time.Sleep(20 * time.Millisecond) f.Close() } } // cleanFiles cleans redundant or expired (compressed) logs. func (w *RollWriter) cleanFiles() { // get the file list of current log. files, err := w.getOldLogFiles() if err != nil || len(files) == 0 { return } // find the oldest files to scavenge. var compress, remove []logInfo files = filterByMaxBackups(files, &remove, w.opts.MaxBackups) // find the expired files by last modified time. files = filterByMaxAge(files, &remove, w.opts.MaxAge) // find files to compress by file extension .gz. filterByCompressExt(files, &compress, w.opts.Compress) // delete expired or redundant files. w.removeFiles(remove) // compress log files. w.compressFiles(compress) } // getOldLogFiles returns the log file list ordered by modified time. func (w *RollWriter) getOldLogFiles() ([]logInfo, error) { files, err := ioutil.ReadDir(w.currDir) if err != nil { return nil, fmt.Errorf("can't read log file directory: %s", err) } logFiles := []logInfo{} filename := filepath.Base(w.filePath) for _, f := range files { if f.IsDir() { continue } if modTime, err := w.matchLogFile(f.Name(), filename); err == nil { logFiles = append(logFiles, logInfo{modTime, f}) } } sort.Sort(byFormatTime(logFiles)) return logFiles, nil } // matchLogFile checks whether current log file matches all relative log files, if matched, returns // the modified time. func (w *RollWriter) matchLogFile(filename, filePrefix string) (time.Time, error) { // exclude current log file. // a.log // a.log.20200712 if filepath.Base(w.currPath) == filename { return time.Time{}, errors.New("ignore current logfile") } // match all log files with current log file. // a.log -> a.log.20200712-1232/a.log.20200712-1232.gz // a.log.20200712 -> a.log.20200712.20200712-1232/a.log.20200712.20200712-1232.gz if !strings.HasPrefix(filename, filePrefix) { return time.Time{}, errors.New("mismatched prefix") } if st, _ := os.Stat(filepath.Join(w.currDir, filename)); st != nil { return st.ModTime(), nil } return time.Time{}, errors.New("file stat fail") } // removeFiles deletes expired or redundant log files. func (w *RollWriter) removeFiles(remove []logInfo) { // clean expired or redundant files. for _, f := range remove { os.Remove(filepath.Join(w.currDir, f.Name())) } } // compressFiles compresses demanded log files. func (w *RollWriter) compressFiles(compress []logInfo) { // compress log files. for _, f := range compress { fn := filepath.Join(w.currDir, f.Name()) _ = compressFile(fn, fn+compressSuffix) } } // filterByMaxBackups filters redundant files that exceeded the limit. func filterByMaxBackups(files []logInfo, remove *[]logInfo, maxBackups int) []logInfo { if maxBackups == 0 || len(files) < maxBackups { return files } var remaining []logInfo preserved := make(map[string]bool) for _, f := range files { fn := strings.TrimSuffix(f.Name(), compressSuffix) preserved[fn] = true if len(preserved) > maxBackups { *remove = append(*remove, f) } else { remaining = append(remaining, f) } } return remaining } // filterByMaxAge filters expired files. func filterByMaxAge(files []logInfo, remove *[]logInfo, maxAge int) []logInfo { if maxAge <= 0 { return files } var remaining []logInfo diff := time.Duration(int64(24*time.Hour) * int64(maxAge)) cutoff := time.Now().Add(-1 * diff) for _, f := range files { if f.timestamp.Before(cutoff) { *remove = append(*remove, f) } else { remaining = append(remaining, f) } } return remaining } // filterByCompressExt filters all compressed files. func filterByCompressExt(files []logInfo, compress *[]logInfo, needCompress bool) { if !needCompress { return } for _, f := range files { if !strings.HasSuffix(f.Name(), compressSuffix) { *compress = append(*compress, f) } } } // compressFile compresses file src to dst, and removes src on success. func compressFile(src, dst string) (err error) { f, err := os.Open(src) if err != nil { return fmt.Errorf("failed to open file: %v", err) } defer f.Close() gzf, err := os.OpenFile(dst, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0666) if err != nil { return fmt.Errorf("failed to open compressed file: %v", err) } defer gzf.Close() gz := gzip.NewWriter(gzf) defer func() { gz.Close() if err != nil { os.Remove(dst) err = fmt.Errorf("failed to compress file: %v", err) } else { os.Remove(src) } }() if _, err := io.Copy(gz, f); err != nil { return err } return nil } // logInfo is an assistant struct which is used to return file name and last modified time. type logInfo struct { timestamp time.Time os.FileInfo } // byFormatTime sorts by time descending order. type byFormatTime []logInfo // Less checks whether the time of b[j] is early than the time of b[i]. func (b byFormatTime) Less(i, j int) bool { return b[i].timestamp.After(b[j].timestamp) } // Swap swaps b[i] and b[j]. func (b byFormatTime) Swap(i, j int) { b[i], b[j] = b[j], b[i] } // Len returns the length of list b. func (b byFormatTime) Len() int { return len(b) }