pkg/wal/wal.go (320 lines of code) (raw):
package wal
import (
"bufio"
"context"
"errors"
"fmt"
"os"
"sync"
"sync/atomic"
"time"
"github.com/Azure/adx-mon/pkg/logger"
"github.com/Azure/adx-mon/pkg/pool"
"github.com/davidnarayan/go-flake"
)
// DefaultIOBufSize is the default buffer size for bufio.Writer.
const DefaultIOBufSize = 4 * 1024
var (
ErrMaxDiskUsageExceeded = fmt.Errorf("max disk usage exceeded")
ErrMaxSegmentsExceeded = fmt.Errorf("max segments exceeded")
ErrMaxSegmentSizeExceeded = fmt.Errorf("max segment size exceeded")
ErrSegmentClosed = fmt.Errorf("segment closed")
ErrSegmentLocked = fmt.Errorf("segment locked")
idgen *flake.Flake
bwPool = pool.NewGeneric(10000, func(sz int) interface{} {
return bufio.NewWriterSize(nil, 8*1024)
})
)
func init() {
var err error
idgen, err = flake.New()
if err != nil {
panic(err)
}
}
type WAL struct {
opts WALOpts
schemaPath string
// index is the index of closed wal segments. The active segment is not part of the index.
index *Index
sampleMetadataBuffer [12]byte
closeFn context.CancelFunc
wg sync.WaitGroup
mu sync.RWMutex
closed bool
segment Segment
// inflightWriteBytes is the sum of bytes from goroutines with active writes in progress but not written
// to the active segment.
inflightWriteBytes int64
// segmentSize tracks the size of the current segment. This is tracked separately from Segment.Size() because
// the latter requires taking an RLock on the segments which creates lock contention.
segmentSize int64
// segmentCreatedAt is the unixtime when the current segment was created. This is tracked separately from Segment
// itself to avoid lock contention.
segmentCreatedAt int64
}
type SegmentInfo struct {
Prefix string
Ulid string
Path string
Size int64
CreatedAt time.Time
}
type WALOpts struct {
StorageDir string
// WAL segment prefix
Prefix string
// SegmentMaxSize is the max size of a segment in bytes before it will be rotated and compressed.
SegmentMaxSize int64
// SegmentMaxAge is the max age of a segment before it will be rotated and compressed.
SegmentMaxAge time.Duration
// MaxDiskUsage is the max disk usage of WAL segments allowed before writes should be rejected.
MaxDiskUsage int64
// MaxSegmentCount is the max number of segments allowed before writes should be rejected.
MaxSegmentCount int
// Index is the index of the WAL segments.
Index *Index
// WALFlushInterval is the interval at which the WAL should be flushed.
WALFlushInterval time.Duration
// EnableWALFsync enables fsync of the segment after every flush.
EnableWALFsync bool
}
type SampleType uint16
const (
UnknownSampleType SampleType = iota
MetricSampleType
TraceSampleType
LogSampleType
)
type WriteOptions func([]byte)
func NewWAL(opts WALOpts) (*WAL, error) {
if opts.StorageDir == "" {
return nil, fmt.Errorf("wal storage dir not defined")
}
if opts.Index == nil {
opts.Index = NewIndex()
}
return &WAL{
index: opts.Index,
opts: opts,
}, nil
}
func (w *WAL) Open(ctx context.Context) error {
ctx, w.closeFn = context.WithCancel(context.Background())
w.mu.Lock()
defer w.mu.Unlock()
w.wg.Add(1)
go w.rotate(ctx)
return nil
}
func (w *WAL) Close() error {
w.closeFn()
w.wg.Wait()
w.mu.Lock()
defer w.mu.Unlock()
w.closed = true
if w.segment != nil {
info := w.segment.Info()
if err := w.segment.Close(); err != nil {
return err
}
w.index.Add(info)
w.segment = nil
}
return nil
}
func (w *WAL) Write(ctx context.Context, buf []byte, opts ...WriteOptions) error {
atomic.AddInt64(&w.inflightWriteBytes, int64(len(buf)))
defer atomic.AddInt64(&w.inflightWriteBytes, -int64(len(buf)))
// Optimistically try to write, but the segment might rotate in the meantime.
// If it does, retry the write one more time.
n, err := w.tryWrite(ctx, buf, opts...)
if errors.Is(err, ErrMaxSegmentSizeExceeded) {
w.rotateSegmentIfNecessary()
n, err = w.tryWrite(ctx, buf)
atomic.AddInt64(&w.segmentSize, int64(n))
return err
} else if errors.Is(err, ErrSegmentClosed) {
n, err = w.tryWrite(ctx, buf, opts...)
atomic.AddInt64(&w.segmentSize, int64(n))
return err
}
atomic.AddInt64(&w.segmentSize, int64(n))
return err
}
func (w *WAL) tryWrite(ctx context.Context, buf []byte, opts ...WriteOptions) (int, error) {
var seg Segment
if err := w.validateLimits(); err != nil {
return 0, err
}
// fast path
w.mu.RLock()
if w.segment != nil {
seg = w.segment
w.mu.RUnlock()
return seg.Write(ctx, buf, opts...)
}
w.mu.RUnlock()
w.mu.Lock()
if w.segment == nil {
var err error
seg, err := NewSegment(w.opts.StorageDir, w.opts.Prefix,
WithFlushIntervale(w.opts.WALFlushInterval),
WithFsync(w.opts.EnableWALFsync))
if err != nil {
w.mu.Unlock()
return 0, err
}
w.segment = seg
}
seg = w.segment
w.mu.Unlock()
return seg.Write(ctx, buf, opts...)
}
func (w *WAL) validateLimits() error {
if w.opts.MaxDiskUsage > 0 && w.index.TotalSize()+atomic.LoadInt64(&w.inflightWriteBytes) >= w.opts.MaxDiskUsage {
return ErrMaxDiskUsageExceeded
}
if w.opts.MaxSegmentCount > 0 && w.index.TotalSegments() >= w.opts.MaxSegmentCount {
return ErrMaxSegmentsExceeded
}
if w.opts.SegmentMaxSize > 0 && atomic.LoadInt64(&w.segmentSize)+atomic.LoadInt64(&w.inflightWriteBytes) >= w.opts.SegmentMaxSize {
return ErrMaxSegmentSizeExceeded
}
return nil
}
func (w *WAL) Size() int {
w.mu.RLock()
defer w.mu.RUnlock()
if w.segment == nil {
return 0
}
return int(w.segment.Size())
}
func (w *WAL) Segment() Segment {
w.mu.RLock()
defer w.mu.RUnlock()
return w.segment
}
func (w *WAL) rotate(ctx context.Context) {
defer w.wg.Done()
t := time.NewTicker(10 * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
w.rotateSegmentIfNecessary()
}
}
}
func (w *WAL) requiresRotation() bool {
return (w.opts.SegmentMaxSize > 0 && atomic.LoadInt64(&w.segmentSize)+atomic.LoadInt64(&w.inflightWriteBytes) >= w.opts.SegmentMaxSize) ||
(w.opts.SegmentMaxAge.Seconds() > 0 && time.Since(time.Unix(w.segmentCreatedAt, 0)) >= w.opts.SegmentMaxAge)
}
func (w *WAL) rotateSegmentIfNecessary() {
if w.requiresRotation() {
w.mu.Lock()
// Re-verify rotation is needed under write lock since the fast path check is racy
if !w.requiresRotation() {
w.mu.Unlock()
return
}
toClose := w.segment
var err error
w.segment, err = NewSegment(w.opts.StorageDir, w.opts.Prefix,
WithFlushIntervale(w.opts.WALFlushInterval),
WithFsync(w.opts.EnableWALFsync))
if err != nil {
logger.Errorf("Failed to create new segment: %s", err.Error())
w.segment = nil
atomic.StoreInt64(&w.segmentSize, 0)
atomic.StoreInt64(&w.segmentCreatedAt, 0)
} else {
atomic.StoreInt64(&w.segmentSize, w.segment.Size())
atomic.StoreInt64(&w.segmentCreatedAt, w.segment.CreatedAt().Unix())
}
w.mu.Unlock()
if toClose != nil {
// 8 bytes is the size of the segment magic header bytes. If that is all we've written, we can just
// delete it so that we don't end up uploading empty segments to Kusto.
if toClose.Size() > 8 {
info := toClose.Info()
w.index.Add(info)
} else {
_ = os.Remove(toClose.Path())
}
if err := toClose.Close(); err != nil {
logger.Errorf("Failed to close segment: %s %s", toClose.Path(), err.Error())
}
}
}
}
// Path returns the path of the active segment.
func (w *WAL) Path() string {
w.mu.RLock()
defer w.mu.RUnlock()
return w.path()
}
func (w *WAL) path() string {
if w.segment == nil {
return ""
}
return w.segment.Path()
}
func (w *WAL) Remove(path string) error {
err := os.Remove(path)
if os.IsNotExist(err) {
return nil
}
return err
}
func (w *WAL) Append(ctx context.Context, buf []byte) error {
atomic.AddInt64(&w.inflightWriteBytes, int64(len(buf)))
defer atomic.AddInt64(&w.inflightWriteBytes, -int64(len(buf)))
n, err := w.tryAppend(ctx, buf)
if errors.Is(err, ErrMaxSegmentSizeExceeded) {
w.rotateSegmentIfNecessary()
n, err = w.tryAppend(ctx, buf)
atomic.AddInt64(&w.segmentSize, int64(n))
return err
} else if errors.Is(err, ErrSegmentClosed) {
n, err = w.tryAppend(ctx, buf)
atomic.AddInt64(&w.segmentSize, int64(n))
return err
}
atomic.AddInt64(&w.segmentSize, int64(n))
return err
}
func (w *WAL) tryAppend(ctx context.Context, buf []byte) (int, error) {
var seg Segment
if err := w.validateLimits(); err != nil {
return 0, err
}
// fast path
w.mu.RLock()
if w.segment != nil {
seg = w.segment
w.mu.RUnlock()
return seg.Append(ctx, buf)
}
w.mu.RUnlock()
w.mu.Lock()
if w.segment == nil {
var err error
seg, err := NewSegment(w.opts.StorageDir, w.opts.Prefix,
WithFlushIntervale(w.opts.WALFlushInterval),
WithFsync(w.opts.EnableWALFsync))
if err != nil {
w.mu.Unlock()
return 0, err
}
w.segment = seg
}
seg = w.segment
w.mu.Unlock()
return seg.Append(ctx, buf)
}
func (w *WAL) RemoveAll() error {
w.mu.Lock()
defer w.mu.Unlock()
if !w.closed {
return fmt.Errorf("wal not closed")
}
closed := w.index.Get(nil, w.opts.Prefix)
for _, info := range closed {
if err := w.Remove(info.Path); err != nil {
return err
}
w.index.Remove(info)
}
if w.segment != nil {
return w.Remove(w.segment.Path())
}
return nil
}
func (w *WAL) Flush() error {
w.mu.RLock()
defer w.mu.RUnlock()
if w.segment == nil {
return nil
}
return w.segment.Flush()
}