pkg/wal/index.go (201 lines of code) (raw):
package wal
import (
"fmt"
"io"
"sort"
"sync"
"sync/atomic"
"text/tabwriter"
"time"
)
// Index provides overview of all segments in a repository.
type Index struct {
mu sync.RWMutex
segments map[string][]SegmentInfo
totalSize int64
}
// NewIndex returns a new index.
func NewIndex() *Index {
return &Index{
segments: make(map[string][]SegmentInfo),
}
}
// Add adds a segment to the index.
func (i *Index) Add(s SegmentInfo) {
i.mu.Lock()
defer i.mu.Unlock()
atomic.AddInt64(&i.totalSize, s.Size)
i.segments[s.Prefix] = append(i.segments[s.Prefix], s)
}
// Get returns all segments for a given prefix.
func (i *Index) Get(infos []SegmentInfo, prefix string) []SegmentInfo {
i.mu.RLock()
defer i.mu.RUnlock()
infos = append(infos[:0], i.segments[prefix]...)
return infos
}
// Remove removes a segment from the index.
func (i *Index) Remove(s SegmentInfo) {
i.mu.Lock()
defer i.mu.Unlock()
segments := i.segments[s.Prefix]
for idx, seg := range segments {
if seg.Path == s.Path {
segments = append(segments[:idx], segments[idx+1:]...)
atomic.AddInt64(&i.totalSize, -s.Size)
if len(segments) == 0 {
delete(i.segments, s.Prefix)
break
}
i.segments[s.Prefix] = segments
break
}
}
}
// Oldest returns the prefix of the oldest segment.
func (i *Index) OldestPrefix() string {
i.mu.RLock()
defer i.mu.RUnlock()
var oldest SegmentInfo
for _, segments := range i.segments {
for _, seg := range segments {
if oldest.CreatedAt.IsZero() || seg.CreatedAt.Before(oldest.CreatedAt) {
oldest = seg
}
}
}
return oldest.Prefix
}
// LargestSizePrefix returns the prefix of the segment with the largest total size.
func (i *Index) LargestSizePrefix() string {
i.mu.RLock()
defer i.mu.RUnlock()
var (
prefix string
size int64
)
for _, segments := range i.segments {
var sum int64
for _, seg := range segments {
sum += seg.Size
}
if sum > size || prefix == "" {
size = sum
prefix = segments[0].Prefix
}
}
return prefix
}
// LargestCountPrefix returns the prefix of the segment with the largest total count.
func (i *Index) LargestCountPrefix() string {
i.mu.RLock()
defer i.mu.RUnlock()
var (
prefix string
count int
minAge time.Time
)
for _, segments := range i.segments {
var age time.Time
for _, seg := range segments {
if age.IsZero() || seg.CreatedAt.Before(age) {
age = seg.CreatedAt
}
}
if len(segments) > count || prefix == "" {
count = len(segments)
prefix = segments[0].Prefix
minAge = age
continue
}
// If there is a tie, use the oldest segment.
if len(segments) == count && age.Before(minAge) {
count = len(segments)
prefix = segments[0].Prefix
minAge = age
}
}
return prefix
}
// TotalSegments returns the total number of segments in the index.
func (i *Index) TotalSegments() int {
i.mu.RLock()
defer i.mu.RUnlock()
var count int
for _, segments := range i.segments {
count += len(segments)
}
return count
}
// TotalPrefixes returns the total number of prefixes in the index.
func (i *Index) TotalPrefixes() int {
i.mu.RLock()
defer i.mu.RUnlock()
return len(i.segments)
}
// PrefixesBySize returns all prefixes sorted by total size least to greatest.
func (i *Index) PrefixesBySize() []string {
i.mu.RLock()
defer i.mu.RUnlock()
var prefixes []string
for prefix := range i.segments {
prefixes = append(prefixes, prefix)
}
sizes := make(map[string]int64)
for _, prefix := range prefixes {
for _, seg := range i.segments[prefix] {
sizes[prefix] += seg.Size
}
}
sort.Slice(prefixes, func(i, j int) bool {
return sizes[prefixes[i]] < sizes[prefixes[j]]
})
return prefixes
}
// PrefixesByAge returns all prefixes sorted by oldest to newest.
func (i *Index) PrefixesByAge() []string {
i.mu.RLock()
defer i.mu.RUnlock()
var prefixes []string
for prefix := range i.segments {
prefixes = append(prefixes, prefix)
}
ages := make(map[string]time.Time)
for _, prefix := range prefixes {
for _, seg := range i.segments[prefix] {
if ages[prefix].IsZero() || seg.CreatedAt.Before(ages[prefix]) {
ages[prefix] = seg.CreatedAt
}
}
}
sort.Slice(prefixes, func(i, j int) bool {
return ages[prefixes[i]].Before(ages[prefixes[j]])
})
return prefixes
}
// PrefixesByCount returns all prefixes sorted by total count least to greatest. If there is a tie, the prefix
// that is lexicographically first is returned.
func (i *Index) PrefixesByCount() []string {
i.mu.RLock()
defer i.mu.RUnlock()
var prefixes []string
for prefix := range i.segments {
prefixes = append(prefixes, prefix)
}
counts := make(map[string]int)
for _, prefix := range prefixes {
counts[prefix] = len(i.segments[prefix])
}
sort.Slice(prefixes, func(i, j int) bool {
if counts[prefixes[i]] == counts[prefixes[j]] {
return prefixes[i] < prefixes[j]
}
return counts[prefixes[i]] < counts[prefixes[j]]
})
return prefixes
}
func (i *Index) TotalSize() int64 {
return atomic.LoadInt64(&i.totalSize)
}
func (i *Index) WriteDebug(w io.Writer) error {
_, _ = fmt.Fprintf(w, "Index: Disk Usage: %d, Segments: %d, Prefixes: %d\n\n", i.TotalSize(), i.TotalSegments(), i.TotalPrefixes())
tw := tabwriter.NewWriter(w, 4, 0, 2, ' ', 0)
tw.Write([]byte("Prefix\tSegments\tSize\tCreatedAt\n"))
i.mu.RLock()
for prefix, segments := range i.segments {
var size int64
for _, seg := range segments {
size += seg.Size
}
tw.Write([]byte(fmt.Sprintf("%s\t%d\t%d\t%s\n", prefix, len(segments), size, segments[0].CreatedAt.Format(time.RFC3339))))
}
i.mu.RUnlock()
tw.Flush()
w.Write([]byte("\n"))
return nil
}