internal/gitaly/storage/storagemgr/partition/log/positions.go (59 lines of code) (raw):
// positions.go
package log
import (
"fmt"
"sync/atomic"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
)
var (
// AppliedPosition keeps track of the latest applied position. WAL cannot prune a log entry if it has not been applied.
AppliedPosition = storage.PositionType{Name: "AppliedPosition", ShouldNotify: false}
// ConsumerPosition keeps track of the latest consumer acknowledgment.
ConsumerPosition = storage.PositionType{Name: "ConsumerPosition", ShouldNotify: true}
)
// position tracks the last LSN acknowledged for a particular type.
type position struct {
atomic.Value
}
func newPosition() *position {
p := position{}
p.setPosition(0)
return &p
}
func (p *position) getPosition() storage.LSN {
return p.Load().(storage.LSN)
}
func (p *position) setPosition(pos storage.LSN) {
p.Store(pos)
}
// PositionTracker manages positions for various position types.
type PositionTracker struct {
positions map[string]*position
}
// NewPositionTracker creates and initializes a new PositionTracker.
func NewPositionTracker() *PositionTracker {
return &PositionTracker{
positions: map[string]*position{
AppliedPosition.Name: newPosition(),
},
}
}
// Register adds a new position type to the tracker.
func (p *PositionTracker) Register(t storage.PositionType) error {
if _, exist := p.positions[t.Name]; exist {
return fmt.Errorf("position type %q already registered", t.Name)
}
p.positions[t.Name] = newPosition()
return nil
}
// Set updates the position for a given type.
func (p *PositionTracker) Set(t string, lsn storage.LSN) error {
if _, exist := p.positions[t]; !exist {
return fmt.Errorf("acknowledged an unregistered position type %q", t)
}
p.positions[t].setPosition(lsn)
return nil
}
// Get retrieves the position for a given type.
func (p *PositionTracker) Get(t string) (storage.LSN, error) {
if _, exist := p.positions[t]; !exist {
return 0, fmt.Errorf("acknowledged an unregistered position type %q", t)
}
return p.positions[t].getPosition(), nil
}
// Each iterates through the list of tracked positions and yields the callback with corresponding LSN.
func (p *PositionTracker) Each(callback func(string, storage.LSN)) {
for t, pos := range p.positions {
callback(t, pos.getPosition())
}
}