internal/gitaly/storage/raftmgr/replica_event_registry.go (87 lines of code) (raw):
package raftmgr
import (
"sync"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
)
// EventID uniquely identifies an event in the registry.
type EventID uint64
// EventWaiter holds the information required to wait for an event to be committed.
type EventWaiter struct {
ID EventID
LSN storage.LSN
C chan error
}
// ReplicaEventRegistry manages events and their associated waiters, enabling the registration
// and removal of waiters upon event commitment.
type ReplicaEventRegistry struct {
mu sync.Mutex
nextEventID EventID
waiters map[EventID]*EventWaiter
metrics RaftMetrics
}
// NewReplicaEventRegistry initializes and returns a new instance of Registry.
func NewReplicaEventRegistry(metrics RaftMetrics) *ReplicaEventRegistry {
return &ReplicaEventRegistry{
waiters: make(map[EventID]*EventWaiter),
metrics: metrics,
}
}
// Register creates a new Waiter for an upcoming event and returns it.
// It must be called whenever an event is proposed, with the event ID embedded
// in the corresponding Raft message.
func (r *ReplicaEventRegistry) Register() *EventWaiter {
r.mu.Lock()
defer r.mu.Unlock()
r.nextEventID++
waiter := &EventWaiter{
ID: r.nextEventID,
C: make(chan error, 1),
}
r.waiters[r.nextEventID] = waiter
r.updateQueueDepth()
return waiter
}
// AssignLSN assigns LSN to an event. LSN of an event is used to unlock obsolete proposals if Raft detects duplicated
// LSNs but with higher term.
func (r *ReplicaEventRegistry) AssignLSN(id EventID, lsn storage.LSN) {
r.mu.Lock()
defer r.mu.Unlock()
waiter, exists := r.waiters[id]
if !exists {
return
}
waiter.LSN = lsn
}
// UntrackSince untracks all events having LSNs greater than or equal to the input LSN. The input error is assigned to
// impacted events.
func (r *ReplicaEventRegistry) UntrackSince(lsn storage.LSN, err error) {
r.mu.Lock()
defer r.mu.Unlock()
var toRemove []EventID
for id, w := range r.waiters {
if w.LSN >= lsn {
toRemove = append(toRemove, id)
}
}
for _, id := range toRemove {
r.waiters[id].C <- err
close(r.waiters[id].C)
delete(r.waiters, id)
}
r.updateQueueDepth()
}
// UntrackAll untracks all events. The input error is assigned to impacted events.
func (r *ReplicaEventRegistry) UntrackAll(err error) {
r.mu.Lock()
defer r.mu.Unlock()
for _, w := range r.waiters {
w.C <- err
close(w.C)
}
clear(r.waiters)
r.updateQueueDepth()
}
// Untrack closes the channel associated with a given EventID and removes the waiter from the registry once the event is
// committed. This function returns if the registry is still tracking the event.
func (r *ReplicaEventRegistry) Untrack(id EventID) bool {
r.mu.Lock()
defer r.mu.Unlock()
waiter, exists := r.waiters[id]
if !exists {
return false
}
// Close the channel to notify any goroutines waiting on this event.
close(waiter.C)
delete(r.waiters, id)
r.updateQueueDepth()
return true
}
func (r *ReplicaEventRegistry) updateQueueDepth() {
if r.metrics.proposalQueueDepth != nil {
r.metrics.proposalQueueDepth.Set(float64(len(r.waiters)))
}
}