pkg/wal/repository.go (220 lines of code) (raw):
package wal
import (
"bytes"
"context"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"text/tabwriter"
"time"
flakeutil "github.com/Azure/adx-mon/pkg/flake"
"github.com/Azure/adx-mon/pkg/logger"
"github.com/Azure/adx-mon/pkg/partmap"
)
// Repository is a collection of WALs.
type Repository struct {
opts RepositoryOpts
index *Index
wals *partmap.Map[*WAL]
}
type RepositoryOpts struct {
StorageDir string
SegmentMaxSize int64
SegmentMaxAge time.Duration
MaxDiskUsage int64
MaxSegmentCount int
WALFlushInterval time.Duration
EnableWALFsync bool
}
func NewRepository(opts RepositoryOpts) *Repository {
return &Repository{
opts: opts,
index: NewIndex(),
wals: partmap.NewMap[*WAL](64),
}
}
func (s *Repository) Open(ctx context.Context) error {
if s.opts.StorageDir == "" {
return fmt.Errorf("storage dir is required")
}
stat, err := os.Stat(s.opts.StorageDir)
if os.IsNotExist(err) {
if err := os.MkdirAll(s.opts.StorageDir, 0755); err != nil {
return err
}
} else if err != nil {
return err
} else if !stat.IsDir() {
return fmt.Errorf("storage dir is not a directory")
}
dir, err := os.Open(s.opts.StorageDir)
if err != nil {
return err
}
for {
entries, err := dir.ReadDir(100)
if err == io.EOF {
break
} else if err != nil {
return err
}
for _, d := range entries {
path := filepath.Join(s.opts.StorageDir, d.Name())
if d.IsDir() || filepath.Ext(path) != ".wal" {
continue
}
// This block was added when we had an non-backwards compatible segment file change in the segment
// file format. To simplify the migration, we just remove any segment files that are not in the
// expected format. In the future, we will have a versioned segment file format to avoid this.
if !IsSegment(path) {
logger.Warnf("Segment file is not a WAL segment file: %s. Removing", path)
if err := os.Remove(path); err != nil && !os.IsNotExist(err) {
logger.Warnf("Failed to remove invalid segment file: %s %s", path, err.Error())
}
continue
}
// Walk each segment on disk and Open it to trigger a repair if necessary.
var seg Segment
seg, err = Open(path)
if err != nil {
logger.Warnf("Failed to open segment file: %s %s", path, err.Error())
} else if err := seg.Close(); err != nil {
logger.Warnf("Failed to close segment file: %s %s", path, err.Error())
}
fileName := filepath.Base(path)
database, table, schema, epoch, err := ParseFilename(fileName)
if err != nil {
continue
}
prefix := fmt.Sprintf("%s_%s", database, table)
if schema != "" {
prefix = fmt.Sprintf("%s_%s", prefix, schema)
}
createdAt, err := flakeutil.ParseFlakeID(epoch)
if err != nil {
logger.Warnf("Failed to parse flake id: %s %s", epoch, err.Error())
continue
}
fi, err := d.Info()
if err != nil {
logger.Warnf("Failed to get file info: %s %s", path, err.Error())
continue
}
// If the segment is only 8 bytes, that means only the segment magic header has been written and there
// is no data in the file. We don't want to upload these to kusto so they can be removed.
if fi.Size() == 8 {
if logger.IsDebug() {
logger.Debugf("Removing empty segment: %s", path)
}
if err := os.Remove(path); err != nil {
logger.Warnf("Failed to remove empty segment: %s %s", path, err.Error())
}
continue
}
info := SegmentInfo{
Prefix: prefix,
Ulid: epoch,
Path: path,
Size: fi.Size(),
CreatedAt: createdAt,
}
s.index.Add(info)
_, ok := s.wals.Get(prefix)
if ok {
continue
}
wal, err := s.newWAL(ctx, prefix)
if err != nil {
return err
}
s.wals.Set(prefix, wal)
}
}
return nil
}
func (s *Repository) Close() error {
if err := s.wals.Each(func(key string, value *WAL) error {
wal := value
return wal.Close()
}); err != nil {
return err
}
return nil
}
func (s *Repository) newWAL(ctx context.Context, prefix string) (*WAL, error) {
walOpts := WALOpts{
Prefix: prefix,
StorageDir: s.opts.StorageDir,
SegmentMaxSize: s.opts.SegmentMaxSize,
SegmentMaxAge: s.opts.SegmentMaxAge,
MaxDiskUsage: s.opts.MaxDiskUsage,
Index: s.index,
WALFlushInterval: s.opts.WALFlushInterval,
EnableWALFsync: s.opts.EnableWALFsync,
}
wal, err := NewWAL(walOpts)
if err != nil {
return nil, err
}
if err := wal.Open(ctx); err != nil {
return nil, err
}
return wal, nil
}
func (s *Repository) Get(ctx context.Context, key []byte) (*WAL, error) {
v, err := s.wals.GetOrCreate(string(key), func() (*WAL, error) {
return s.newWAL(ctx, string(key))
})
if err != nil {
return nil, err
}
return v, nil
}
func (s *Repository) Count() int {
return s.wals.Count()
}
func (s *Repository) Remove(key []byte) error {
wal, ok := s.wals.Get(string(key))
if !ok {
return nil
}
w := wal
if err := w.RemoveAll(); err != nil {
return err
}
s.wals.Delete(string(key))
return nil
}
func (s *Repository) Keys() [][]byte {
keys := make([][]byte, 0, s.wals.Count())
s.wals.Each(func(key string, value *WAL) error {
keys = append(keys, []byte(key))
return nil
})
sort.Slice(keys, func(i, j int) bool {
return bytes.Compare(keys[i], keys[j]) < 0
})
return keys
}
func (s *Repository) Index() *Index {
return s.index
}
func (s *Repository) RemoveSegment(si SegmentInfo) {
s.index.Remove(si)
}
func (s *Repository) PrefixesByAge() []string {
return s.index.PrefixesByAge()
}
func (s *Repository) WriteDebug(w io.Writer) error {
if err := s.index.WriteDebug(w); err != nil {
return err
}
tw := tabwriter.NewWriter(w, 4, 0, 2, ' ', 0)
_, _ = tw.Write([]byte("Prefix\tPath\tDisk Usage\n"))
var walsSize, count int
if err := s.wals.Each(func(key string, value *WAL) error {
walsSize += value.Size()
count++
tw.Write([]byte(fmt.Sprintf("%s\t%s\t%d\n", key, value.Path(), value.Size())))
return nil
}); err != nil {
return err
}
_, _ = fmt.Fprintf(w, "\nWAL: Disk Usage: %d, Segments: %d\n", walsSize, count)
return tw.Flush()
}