log/rollwriter/async_roll_writer.go (98 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rollwriter
import (
"bytes"
"errors"
"io"
"time"
"github.com/hashicorp/go-multierror"
)
// AsyncRollWriter is the asynchronous rolling log writer which implements zapcore.WriteSyncer.
type AsyncRollWriter struct {
logger io.WriteCloser
opts *AsyncOptions
logQueue chan []byte
sync chan struct{}
syncErr chan error
close chan struct{}
closeErr chan error
}
// NewAsyncRollWriter create a new AsyncRollWriter.
func NewAsyncRollWriter(logger io.WriteCloser, opt ...AsyncOption) *AsyncRollWriter {
opts := &AsyncOptions{
LogQueueSize: 10000, // default queue size as 10000
WriteLogSize: 4 * 1024, // default write log size as 4K
WriteLogInterval: 100, // default sync interval as 100ms
DropLog: false, // default do not drop logs
}
for _, o := range opt {
o(opts)
}
w := &AsyncRollWriter{}
w.logger = logger
w.opts = opts
w.logQueue = make(chan []byte, opts.LogQueueSize)
w.sync = make(chan struct{})
w.syncErr = make(chan error)
w.close = make(chan struct{})
w.closeErr = make(chan error)
// start a new goroutine write batch logs.
go w.batchWriteLog()
return w
}
// Write writes logs. It implements io.Writer.
func (w *AsyncRollWriter) Write(data []byte) (int, error) {
log := make([]byte, len(data))
copy(log, data)
if w.opts.DropLog {
select {
case w.logQueue <- log:
default:
return 0, errors.New("log queue is full")
}
} else {
w.logQueue <- log
}
return len(data), nil
}
// Sync syncs logs. It implements zapcore.WriteSyncer.
func (w *AsyncRollWriter) Sync() error {
w.sync <- struct{}{}
return <-w.syncErr
}
// Close closes current log file. It implements io.Closer.
func (w *AsyncRollWriter) Close() error {
err := w.Sync()
close(w.close)
return multierror.Append(err, <-w.closeErr).ErrorOrNil()
}
// batchWriteLog asynchronously writes logs in batches.
func (w *AsyncRollWriter) batchWriteLog() {
buffer := bytes.NewBuffer(make([]byte, 0, w.opts.WriteLogSize*2))
ticker := time.NewTicker(time.Millisecond * time.Duration(w.opts.WriteLogInterval))
defer ticker.Stop()
for {
select {
case <-ticker.C:
if buffer.Len() > 0 {
_, _ = w.logger.Write(buffer.Bytes())
buffer.Reset()
}
case data := <-w.logQueue:
buffer.Write(data)
if buffer.Len() >= w.opts.WriteLogSize {
_, _ = w.logger.Write(buffer.Bytes())
buffer.Reset()
}
case <-w.sync:
var err error
if buffer.Len() > 0 {
_, e := w.logger.Write(buffer.Bytes())
err = multierror.Append(err, e).ErrorOrNil()
buffer.Reset()
}
size := len(w.logQueue)
for i := 0; i < size; i++ {
v := <-w.logQueue
_, e := w.logger.Write(v)
err = multierror.Append(err, e).ErrorOrNil()
}
w.syncErr <- err
case <-w.close:
w.closeErr <- w.logger.Close()
return
}
}
}