pkg/wal/iterator.go (115 lines of code) (raw):
package wal
import (
"bufio"
"bytes"
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"github.com/klauspost/compress/s2"
)
// segmentIterator is an iterator for a segment file. It allows reading back values written to the segment in the
// same order they were written.
type segmentIterator struct {
// br is the underlying segment file on disk.
br *bufio.Reader
f io.ReadCloser
// n is the index into buf that allows iterating through values in a block.
n int
// buf is last read block from disk. This block holds multiple values corresponding
// to segment writes.
buf []byte
// value is the current value that is returned for the iterator.
value []byte
// lenCrcBuf is a temp buffer to re-use for extracting the 8 byte (4 len, 4 crc) values
// when iterating.
lenCrcBuf [8]byte
sampleType SampleType
sampleCount uint32
// decodeBuf is a temp buffer to re-use for decoding the block.
decodeBuf []byte
}
func NewSegmentIterator(r io.ReadCloser) (Iterator, error) {
var magicBuf [8]byte
if _, err := io.ReadFull(r, magicBuf[:]); err != nil {
return nil, err
} else if !bytes.Equal(magicBuf[:6], segmentMagic[:6]) {
return nil, ErrInvalidWALSegment
}
return &segmentIterator{
f: r,
br: bufio.NewReader(r),
n: 0,
buf: make([]byte, 0, 4096),
decodeBuf: make([]byte, 0, 4096),
value: nil,
}, nil
}
func (b *segmentIterator) Next() (bool, error) {
// Read the block length and CRC
n, err := io.ReadFull(b.br, b.lenCrcBuf[:8])
if err == io.EOF {
return false, err
} else if err != nil || n != 8 {
return false, nil
}
// Extract the block length and expand the read buffer if it is too small.
blockLen := binary.BigEndian.Uint32(b.lenCrcBuf[:4])
if uint32(cap(b.buf)) < blockLen {
b.buf = make([]byte, 0, blockLen)
}
// If the block length is 0, then we may have some trailing 0 bytes that we can ignore.
// This segment could be repaired, but a Repair would just truncate this data, so we ignore
// it anyway.
if blockLen == 0 {
return false, nil
}
// Extract the CRC value for the block
crc := binary.BigEndian.Uint32(b.lenCrcBuf[4:8])
// Read the expected block length bytes
n, err = io.ReadFull(b.br, b.buf[:blockLen])
if err != nil {
return false, err
}
// Make sure we actually read the number of bytes we were expecting.
if uint32(n) != blockLen {
return false, fmt.Errorf("short block read: expected %d, got %d", blockLen, n)
}
// Validate the block checksum matches still
if crc32.ChecksumIEEE(b.buf[:blockLen]) != crc {
return false, fmt.Errorf("block checksum verification failed")
}
b.decodeBuf, err = s2.Decode(b.decodeBuf[:0], b.buf[:blockLen])
if err != nil {
return false, err
}
if HasSampleMetadata(b.decodeBuf) {
st, sc := SampleMetadata(b.decodeBuf[3:8])
b.sampleType = st
b.sampleCount += sc
b.value = b.decodeBuf[8:]
} else {
b.value = b.decodeBuf
}
return len(b.value) > 0, nil
}
func (b *segmentIterator) Value() []byte {
return b.value
}
func (b *segmentIterator) Close() error {
return b.f.Close()
}
// Verify iterates through the entire segment and verifies the checksums for each block. It stops iterating
// when it reaches the end of the file or encounters a block that could be repaired/dropped. If this does
// not return an error, then the segment can be safely read continuously to walk valid blocks. The iterator must be
// re-created after calling this method.
func (b *segmentIterator) Verify() (int, error) {
var blocks int
for {
// Read the block length and CRC
n, err := io.ReadFull(b.br, b.lenCrcBuf[:8])
if err == io.EOF {
return blocks, nil
} else if err != nil || n != 8 {
// We don't have a full block, if this segment was repaired, we would not see this. Instead of returning
// an error, just stop iteration and assume we've reached the end of the segment.
return blocks, nil
}
// Extract the block length and expand the read buffer if it is too small.
blockLen := binary.BigEndian.Uint32(b.lenCrcBuf[:4])
if uint32(cap(b.buf)) < blockLen {
b.buf = make([]byte, 0, blockLen)
}
// Special case where trailing zeros may exist at the end of the file. We dont' have a valid block, so just
// stop iteration and assume we've reached the end of the segment.
if blockLen == 0 {
return blocks, nil
}
// Extract the CRC value for the block
crc := binary.BigEndian.Uint32(b.lenCrcBuf[4:8])
// Read the expected block length bytes
n, err = io.ReadFull(b.br, b.buf[:blockLen])
if err != nil {
return 0, err
}
// Make sure we actually read the number of bytes we were expecting.
if uint32(n) != blockLen {
return 0, fmt.Errorf("short block read: expected %d, got %d", blockLen, n)
}
// Validate the block checksum matches still
if crc32.ChecksumIEEE(b.buf[:blockLen]) != crc {
return 0, fmt.Errorf("block checksum verification failed")
}
blocks++
}
}
func (b *segmentIterator) Metadata() (t SampleType, sampleCount uint32) {
return b.sampleType, b.sampleCount
}