pkg/wal/segment.go (450 lines of code) (raw):

package wal import ( "bufio" "bytes" "context" "encoding/binary" "errors" "fmt" "hash/crc32" "io" "io/fs" "os" "path/filepath" "strings" "sync" "sync/atomic" "time" flakeutil "github.com/Azure/adx-mon/pkg/flake" "github.com/Azure/adx-mon/pkg/logger" adxsync "github.com/Azure/adx-mon/pkg/sync" "github.com/klauspost/compress/s2" gbp "github.com/libp2p/go-buffer-pool" ) var ( // blockHdrMagic is the magic number for the block header. If this is not present as the first 2 bytes of a block, // then extra metadata is not present. blockHdrMagic = [2]byte{0xAA, 0xAA} // 10101010 10101010 in binary // segmentMagic is the magic number for the segment file. If this is not present as the first 6 bytes of a segment // then the file is not a valid segment file. segmentMagic = [8]byte{'A', 'D', 'X', 'W', 'A', 'L'} segmentVersion = byte(1) ) type SegmentOption func(s *segment) func WithFlushIntervale(d time.Duration) SegmentOption { return func(s *segment) { if d.Seconds() > 0 { s.flushInterval = d } } } func WithFsync(b bool) SegmentOption { return func(s *segment) { s.enableFsync = b } } type Segment interface { Append(ctx context.Context, buf []byte) (int, error) Write(ctx context.Context, buf []byte, opts ...WriteOptions) (int, error) Bytes() ([]byte, error) Close() error ID() string Size() int64 CreatedAt() time.Time Reader() (io.ReadCloser, error) Path() string Iterator() (Iterator, error) Info() SegmentInfo Flush() error // Repair truncates the last bytes in the segment if they are missing, corrupted or have extra data. This // repairs any corrupted segment that may not have fully flushed to disk safely. The segment is truncated // from the first block that is found to be corrupt. Repair() error } type Iterator interface { Next() (bool, error) Value() []byte Close() error // Verify ensures the Iterator can iterate over a continuous series of blocks without error. Verify() (int, error) Metadata() (SampleType, uint32) } type segment struct { // id is the time-ordered ID and allows for segment files to be sorted lexicographically and in time order of // creating. id string createdAt time.Time path string prefix string filePos uint64 wg sync.WaitGroup mu *adxsync.CountingRWMutex // w is the underlying segment file on disk w *os.File // bw is a buffered writer for w that if flushed to disk in batches. bw *bufio.Writer closing chan struct{} closed bool // sample metadata sampleType uint16 sampleCount uint16 flushCh chan chan error flushInterval time.Duration // enableFsync enables fsyncing segments when closed. In deployment with many segments and frequent flush intervals, // this can increase disk IO and reduce performance. If this is enabled, it may require more nodes // to spread the IO load. enableFsync bool } func NewSegment(dir, prefix string, opts ...SegmentOption) (Segment, error) { flakeId := idgen.NextId() createdAt, err := flakeutil.ParseFlakeID(flakeId.String()) if err != nil { return nil, err } fileName := fmt.Sprintf("%s_%s.wal", prefix, flakeId.String()) if !fs.ValidPath(fileName) { return nil, fmt.Errorf("invalid segment filename: %s", fileName) } // Ensure the filename is just the base name, with no path separators within it baseName := filepath.Base(fileName) if baseName != fileName { return nil, fmt.Errorf("invalid segment filename: %s", fileName) } path := filepath.Join(dir, fileName) fw, err := os.Create(path) if err != nil { return nil, err } if n, err := fw.Write(segmentMagic[:]); err != nil { return nil, err } else if n != 8 { return nil, io.ErrShortWrite } bf := bwPool.Get(DefaultIOBufSize).(*bufio.Writer) bf.Reset(fw) f := &segment{ id: flakeId.String(), prefix: prefix, createdAt: createdAt.UTC(), path: path, w: fw, bw: bf, filePos: 8, closing: make(chan struct{}), flushCh: make(chan chan error), flushInterval: 100 * time.Millisecond, mu: adxsync.NewCountingRWMutex(5), } for _, opt := range opts { opt(f) } f.wg.Add(1) go f.flusher() return f, nil } // IsSegment returns true if the file is a valid segment file. func IsSegment(path string) bool { ext := filepath.Ext(path) if ext != ".wal" { return false } ff, err := os.Open(path) if err != nil { return false } defer ff.Close() // First 8 bytes are the header and 6 bytes is the segmentMagic number. The remaining two bytes are not used // yet but can indicate version or other information in the future. var magicBuf [8]byte if n, err := ff.Read(magicBuf[:]); err != nil && !errors.Is(err, io.EOF) { return false } else if n != 8 || !bytes.Equal(magicBuf[:6], segmentMagic[:6]) { return false } return true } func Open(path string) (Segment, error) { if !IsSegment(path) { return nil, fmt.Errorf("invalid segment file: %s", path) } fileName := filepath.Base(path) fileName = strings.TrimSuffix(fileName, filepath.Ext(fileName)) i := strings.LastIndex(fileName, "_") prefix := fileName[:i] id := fileName[i+1:] createdAt, err := flakeutil.ParseFlakeID(id) if err != nil { return nil, err } fd, err := os.OpenFile(path, os.O_APPEND|os.O_RDWR, 0600) if err != nil { return nil, fmt.Errorf("open segment: %s: %fp", path, err) } stat, err := fd.Stat() if err != nil { return nil, err } bf := bwPool.Get(DefaultIOBufSize).(*bufio.Writer) bf.Reset(fd) f := &segment{ id: id, prefix: prefix, createdAt: createdAt, path: path, filePos: uint64(stat.Size()), w: fd, bw: bf, closing: make(chan struct{}), flushCh: make(chan chan error), flushInterval: 100 * time.Millisecond, mu: adxsync.NewCountingRWMutex(5), } if err := f.Repair(); err != nil { return nil, err } f.wg.Add(1) go f.flusher() return f, nil } // Path returns the path on disk of the segment. func (s *segment) Path() string { return s.path } // Reader returns an io.Reader for the segment. The Reader returns segment data automatically handling segment // blocks and validation. func (s *segment) Reader() (io.ReadCloser, error) { s.mu.RLock() defer s.mu.RUnlock() if s.closed { return nil, ErrSegmentClosed } return NewSegmentReader(s.Path()) } // CreateAt returns the time when the segment was created. func (s *segment) CreatedAt() time.Time { return s.createdAt } // Size returns the current size of the segment file on file. func (s *segment) Size() int64 { return int64(atomic.LoadUint64(&s.filePos)) } // ID returns the ID of the segment. func (s *segment) ID() string { return s.id } func (s *segment) Info() SegmentInfo { s.mu.RLock() defer s.mu.RUnlock() sz := s.Size() return SegmentInfo{ Prefix: s.prefix, Ulid: s.id, Size: sz, CreatedAt: s.createdAt, Path: s.path, } } // Iterator returns an iterator to read values written to the segment. Creating an iterator on a segment that is // still being written is not supported. func (s *segment) Iterator() (Iterator, error) { s.mu.RLock() defer s.mu.RUnlock() if s.closed { return nil, ErrSegmentClosed } f, err := os.Open(s.path) if err != nil { return nil, err } return NewSegmentIterator(f) } // Append appends a raw blocks to the segment. This is used for appending blocks that have already been compressed. // Misuse of this func could lead to data corruption. In general, you probably want to use Write instead. func (s *segment) Append(ctx context.Context, buf []byte) (int, error) { iter, err := NewSegmentIterator(io.NopCloser(bytes.NewReader(buf))) if err != nil { return 0, err } // Verify the block is valid before appending if n, err := iter.Verify(); err != nil { return 0, err } else if n == 0 { return 0, nil } if s.mu.TryLock() { defer s.mu.Unlock() } else { return 0, ErrSegmentLocked } if s.closed { return 0, ErrSegmentClosed } // Strip off the header and append the block to the segment n, err := s.appendBlocks(buf[8:]) if err != nil { return 0, err } atomic.AddUint64(&s.filePos, uint64(n)) return n, nil } // Write writes buf to the segment. func (s *segment) Write(ctx context.Context, buf []byte, opts ...WriteOptions) (int, error) { s.mu.RLock() if s.closed { s.mu.RUnlock() return 0, ErrSegmentClosed } s.mu.RUnlock() written, err := s.blockWrite(s.bw, buf, opts...) if err != nil { return 0, err } atomic.AddUint64(&s.filePos, uint64(written)) return written, err } // Bytes returns full segment file as byte slice. func (s *segment) Bytes() ([]byte, error) { f, err := s.Reader() if err != nil { return nil, err } defer f.Close() return io.ReadAll(f) } func (s *segment) Flush() error { doneCh := make(chan error) s.flushCh <- doneCh return <-doneCh } // Close closes the segment for writing. func (s *segment) Close() error { s.mu.Lock() defer s.mu.Unlock() if s.closed { return nil } // Close the channel without holding the lock so goroutines can exit cleanly close(s.closing) s.closed = true // Wait for flusher goroutine to flush any in-flight writes s.wg.Wait() if err := s.bw.Flush(); err != nil { return err } bwPool.Put(s.bw) s.bw = nil if s.enableFsync { if err := s.w.Sync(); errors.Is(err, os.ErrClosed) { return nil } else if err != nil { return err } } return s.w.Close() } // Repair truncates the last bytes in the segment if they are missing, corrupted or have extra data. This // repairs any corrupted segment that may not have fully flushed to disk safely. func (s *segment) Repair() error { buf := make([]byte, 0, 4096) if _, err := s.w.Seek(8, io.SeekStart); err != nil { return err } var ( lastGoodIdx, idx = 8, 8 lenCrcBuf [8]byte ) for { // Read the block length n, err := s.w.Read(lenCrcBuf[:8]) idx += n if err == io.EOF { return nil } if err != nil || n != 8 { logger.Warnf("Repairing segment %s, missing block header, truncating at %d", s.path, lastGoodIdx) return s.truncate(int64(lastGoodIdx)) } blockLen := binary.BigEndian.Uint32(lenCrcBuf[:4]) if uint32(cap(buf)) < blockLen { buf = make([]byte, 0, blockLen) } crc := binary.BigEndian.Uint32(lenCrcBuf[4:8]) n, err = s.w.Read(buf[:blockLen]) idx += n if err != nil { logger.Warnf("Repairing segment %s, unexected error %s, truncating at %d", s.path, err, lastGoodIdx) return s.truncate(int64(lastGoodIdx)) } if uint32(n) != blockLen { logger.Warnf("Repairing segment %s, short block, truncating at %d", s.path, lastGoodIdx) return s.truncate(int64(lastGoodIdx)) } if crc32.ChecksumIEEE(buf[:blockLen]) != crc { logger.Warnf("Repairing segment %s, checksum failed, truncating at %d", s.path, lastGoodIdx) return s.truncate(int64(lastGoodIdx)) } lastGoodIdx = idx } } func (s *segment) flusher() { defer s.wg.Done() t := time.NewTicker(s.flushInterval) defer t.Stop() for { select { case doneCh := <-s.flushCh: var err error if s.mu.TryLock() { err = s.bw.Flush() s.mu.Unlock() } else { err = fmt.Errorf("segment locked") } if err != nil { logger.Errorf("Failed to flush writer for segment: %s: %s", s.path, err) } doneCh <- err case <-s.closing: return case <-t.C: if s.mu.TryLock() { if err := s.bw.Flush(); err != nil { logger.Errorf("Failed to flush writer for segment: %s: %s", s.path, err) } s.mu.Unlock() } } } } func (s *segment) appendBlocks(val []byte) (int, error) { n, err := s.bw.Write(val) if err != nil { return 0, err } else if n != len(val) { return n, io.ErrShortWrite } return n, nil } // blockWrite writes length and CRC32 prefixed block to w func (s *segment) blockWrite(w io.Writer, buf []byte, opts ...WriteOptions) (int, error) { if len(buf) == 0 { return 0, nil } // Each block is constructed as follows: // // The block is prefixed with a 4 byte length and 4 byte CRC32 checksum which is used // to verify the block's integrity. // // The block body is a snappy encoded byte array consisting of a 2 byte magic number, // 1 byte version, 1 byte type, 4 byte count, and N bytes of value. The magic number is used to identify the // block as a valid block. The version is used to identify the version of the block. The type is used to // identify the type of the block. The count is used to identify the number of // samples in the block. The value is the actual data of the block uncompressed. // // ┌───────────┬─────────┬───────────┬───────────┬───────────┬───────────┬───────────┐ // │ Len │ CRC │ Magic │ Version │ Type │ Count │ Value │ // │ 4 bytes │ 4 bytes │ 2 bytes │ 1 byte │ 1 byte │ 4 bytes │ N bytes │ // └───────────┴─────────┴───────────┴───────────┴───────────┴───────────┴───────────┘ // ┌─────────────────────┬───────────────────────────────────────────────────────────┐ // │ Header │ Block │ // └─────────────────────┴───────────────────────────────────────────────────────────┘ // The block header is 8 bytes long and consists of the length and CRC32 checksum of the block. // The other 8 bytes are for the magic number, version, type, and count of the block. // We use one slice from the buffer pool that is large enough to store the metadata headers // and the compressed value. blockBuf := gbp.Get(8 + len(buf)) defer gbp.Put(blockBuf) // Copy the magic number and version to the buffer copy(blockBuf[0:2], blockHdrMagic[:]) blockBuf[2] = segmentVersion // Default to unknown, no count data for the block blockBuf[3] = byte(UnknownSampleType) binary.BigEndian.PutUint32(blockBuf[4:8], 0) // Apply the WriteOptions to set sample type and count for _, opt := range opts { opt(blockBuf[3:8]) } // Append the block value to the buffer copy(blockBuf[8:], buf) // We need a separate buffer build the block header and value b := gbp.Get(8 + s2.MaxEncodedLen(len(blockBuf))) defer gbp.Put(b) // Encode the block header and value compressedBytes := s2.EncodeBetter(b[8:], blockBuf) binary.BigEndian.PutUint32(b[0:4], uint32(len(compressedBytes))) binary.BigEndian.PutUint32(b[4:8], crc32.ChecksumIEEE(compressedBytes)) b = b[:8+len(compressedBytes)] s.mu.Lock() defer s.mu.Unlock() if s.closed { return 0, ErrSegmentClosed } n, err := w.Write(b) if err != nil { return 0, err } else if n != len(b) { return 0, io.ErrShortWrite } return n, nil } func (s *segment) truncate(ofs int64) error { if err := s.w.Truncate(ofs); err != nil { return err } if err := s.w.Sync(); err != nil { return err } atomic.StoreUint64(&s.filePos, uint64(ofs)) return nil } func WithSampleMetadata(t SampleType, count uint32) WriteOptions { return func(b []byte) { if len(b) < 5 { return } b[0] = byte(t) binary.BigEndian.PutUint32(b[1:5], count) } } func SampleMetadata(b []byte) (t SampleType, count uint32) { if len(b) < 5 { return } t = SampleType(b[0]) count = binary.BigEndian.Uint32(b[1:5]) return } func HasSampleMetadata(b []byte) bool { return bytes.Equal(b[0:2], blockHdrMagic[:]) }