internal/praefect/datastore/memory.go (308 lines of code) (raw):

package datastore import ( "bytes" "context" "encoding/json" "errors" "fmt" "sync" "time" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config" ) var errDeadAckedAsFailed = errors.New("job acknowledged as failed with no attempts left, should be 'dead'") // NewMemoryReplicationEventQueue return in-memory implementation of the ReplicationEventQueue. func NewMemoryReplicationEventQueue(conf config.Config) ReplicationEventQueue { storageNamesByVirtualStorage := make(map[string][]string, len(conf.VirtualStorages)) for _, vs := range conf.VirtualStorages { storages := make([]string, len(vs.Nodes)) for i, node := range vs.Nodes { storages[i] = node.Storage } storageNamesByVirtualStorage[vs.Name] = storages } return &memoryReplicationEventQueue{ dequeued: map[uint64]struct{}{}, storageNamesByVirtualStorage: storageNamesByVirtualStorage, lastEventByDest: map[eventDestination]ReplicationEvent{}, } } type eventDestination struct { virtual, storage, relativePath string } // memoryReplicationEventQueue implements queue interface with in-memory implementation of storage type memoryReplicationEventQueue struct { sync.RWMutex seq uint64 // used to generate unique identifiers for events queued []ReplicationEvent // all new events stored as queue dequeued map[uint64]struct{} // all events dequeued, but not yet acknowledged storageNamesByVirtualStorage map[string][]string // bindings between virtual storage and storages behind them lastEventByDest map[eventDestination]ReplicationEvent // contains 'virtual+storage+repo' => 'last even' mappings } // nextID returns a new sequential ID for new events. // Needs to be called with lock protection. func (s *memoryReplicationEventQueue) nextID() uint64 { s.seq++ return s.seq } func (s *memoryReplicationEventQueue) Enqueue(_ context.Context, event ReplicationEvent) (ReplicationEvent, error) { event.Attempt = 3 event.State = JobStateReady event.CreatedAt = time.Now().UTC() // event.LockID is unnecessary with an in memory data store as it is intended to synchronize multiple praefect instances // but must be filled out to produce same event as it done by SQL implementation event.LockID = event.Job.VirtualStorage + "|" + event.Job.TargetNodeStorage + "|" + event.Job.RelativePath dest := s.defineDest(event) s.Lock() defer s.Unlock() event.ID = s.nextID() s.queued = append(s.queued, event) s.lastEventByDest[dest] = event return event, nil } func (s *memoryReplicationEventQueue) Dequeue(_ context.Context, virtualStorage, nodeStorage string, count int) ([]ReplicationEvent, error) { s.Lock() defer s.Unlock() var result []ReplicationEvent uniqueJob := make(map[string]struct{}) for i := 0; i < len(s.queued); i++ { event := s.queued[i] isForVirtualStorage := event.Job.VirtualStorage == virtualStorage isForTargetStorage := event.Job.TargetNodeStorage == nodeStorage isReadyOrFailed := event.State == JobStateReady || event.State == JobStateFailed if isForVirtualStorage && isForTargetStorage && isReadyOrFailed { jobData, err := json.Marshal(event.Job) if err != nil { return nil, err } if _, found := uniqueJob[string(jobData)]; found { continue } uniqueJob[string(jobData)] = struct{}{} updatedAt := time.Now().UTC() event.Attempt-- event.State = JobStateInProgress event.UpdatedAt = &updatedAt s.queued[i] = event s.dequeued[event.ID] = struct{}{} eventDest := s.defineDest(event) if last, found := s.lastEventByDest[eventDest]; found && last.ID == event.ID { s.lastEventByDest[eventDest] = event } result = append(result, event) if len(result) >= count { break } } } return result, nil } func (s *memoryReplicationEventQueue) Acknowledge(_ context.Context, state JobState, ids []uint64) ([]uint64, error) { if len(ids) == 0 { return nil, nil } if err := allowToAck(state); err != nil { return nil, err } s.Lock() defer s.Unlock() var result []uint64 for _, id := range ids { if _, found := s.dequeued[id]; !found { // event was not dequeued from the queue, so it can't be acknowledged continue } for i := 0; i < len(s.queued); i++ { if s.queued[i].ID != id { continue } if s.queued[i].State != JobStateInProgress { return nil, fmt.Errorf("event not in progress, can't be acknowledged: %d [%s]", s.queued[i].ID, s.queued[i].State) } if s.queued[i].Attempt == 0 && state == JobStateFailed { return nil, errDeadAckedAsFailed } dequeuedAt := s.queued[i].UpdatedAt updatedAt := time.Now().UTC() s.queued[i].State = state s.queued[i].UpdatedAt = &updatedAt eventDest := s.defineDest(s.queued[i]) if last, found := s.lastEventByDest[eventDest]; found && last.ID == s.queued[i].ID { s.lastEventByDest[eventDest] = s.queued[i] } result = append(result, id) if state == JobStateCompleted { ackJobData, err := json.Marshal(s.queued[i].Job) if err != nil { return nil, err } for j := i + 1; j < len(s.queued); j++ { if dequeuedAt.Before(s.queued[j].CreatedAt) { break } sameJobData, err := json.Marshal(s.queued[j].Job) if err != nil { return nil, err } if bytes.Equal(ackJobData, sameJobData) { s.remove(j) } } } switch state { case JobStateCompleted, JobStateDead: // this event is fully processed and could be removed s.remove(i) } break } } return result, nil } // StartHealthUpdate does nothing as it has no sense in terms of in-memory implementation as // all information about events will be lost after restart. func (s *memoryReplicationEventQueue) StartHealthUpdate(context.Context, <-chan time.Time, []ReplicationEvent) error { return nil } func (s *memoryReplicationEventQueue) AcknowledgeStale(context.Context, time.Duration) (int64, error) { // this implementation has no problem of stale replication events as it has no information about // job processing after restart of the application return 0, nil } // remove deletes i-th element from the queue and from the in-flight tracking map. // It doesn't check 'i' for the out of range and must be called with lock protection. func (s *memoryReplicationEventQueue) remove(i int) { delete(s.dequeued, s.queued[i].ID) s.queued = append(s.queued[:i], s.queued[i+1:]...) } func (s *memoryReplicationEventQueue) defineDest(event ReplicationEvent) eventDestination { return eventDestination{virtual: event.Job.VirtualStorage, storage: event.Job.TargetNodeStorage, relativePath: event.Job.RelativePath} } // NewReplicationEventQueueInterceptor returns interception over `ReplicationEventQueue` interface. func NewReplicationEventQueueInterceptor(queue ReplicationEventQueue) *ReplicationEventQueueInterceptor { return &ReplicationEventQueueInterceptor{ ReplicationEventQueue: queue, } } // DequeParams is the list of parameters used for Dequeue method call. type DequeParams struct { VirtualStorage, NodeStorage string Count int } // AcknowledgeParams is the list of parameters used for Acknowledge method call. type AcknowledgeParams struct { State JobState IDs []uint64 } // ReplicationEventQueueInterceptor allows to register interceptors for `ReplicationEventQueue` interface. // It also provides additional methods to get info about incoming and outgoing data from the underling // queue. // NOTE: it should be used for testing purposes only as it persists data in memory and doesn't clean it up. type ReplicationEventQueueInterceptor struct { mtx sync.Mutex ReplicationEventQueue onEnqueue func(context.Context, ReplicationEvent, ReplicationEventQueue) (ReplicationEvent, error) onDequeue func(context.Context, string, string, int, ReplicationEventQueue) ([]ReplicationEvent, error) onAcknowledge func(context.Context, JobState, []uint64, ReplicationEventQueue) ([]uint64, error) onStartHealthUpdate func(context.Context, <-chan time.Time, []ReplicationEvent) error onAcknowledgeStale func(context.Context, time.Duration) (int64, error) enqueue []ReplicationEvent enqueueResult []ReplicationEvent dequeue []DequeParams dequeueResult [][]ReplicationEvent acknowledge []AcknowledgeParams acknowledgeResult [][]uint64 } // OnEnqueue allows to set action that would be executed each time when `Enqueue` method called. func (i *ReplicationEventQueueInterceptor) OnEnqueue(action func(context.Context, ReplicationEvent, ReplicationEventQueue) (ReplicationEvent, error)) { i.onEnqueue = action } // OnDequeue allows to set action that would be executed each time when `Dequeue` method called. func (i *ReplicationEventQueueInterceptor) OnDequeue(action func(context.Context, string, string, int, ReplicationEventQueue) ([]ReplicationEvent, error)) { i.onDequeue = action } // OnAcknowledge allows to set action that would be executed each time when `Acknowledge` method called. func (i *ReplicationEventQueueInterceptor) OnAcknowledge(action func(context.Context, JobState, []uint64, ReplicationEventQueue) ([]uint64, error)) { i.onAcknowledge = action } // OnStartHealthUpdate allows to set action that would be executed each time when `StartHealthUpdate` method called. func (i *ReplicationEventQueueInterceptor) OnStartHealthUpdate(action func(context.Context, <-chan time.Time, []ReplicationEvent) error) { i.onStartHealthUpdate = action } // OnAcknowledgeStale allows to set action that would be executed each time when `AcknowledgeStale` method called. func (i *ReplicationEventQueueInterceptor) OnAcknowledgeStale(action func(context.Context, time.Duration) (int64, error)) { i.onAcknowledgeStale = action } // Enqueue intercepts call to the Enqueue method of the underling implementation or a call back. // It populates storage of incoming and outgoing parameters before and after method call. func (i *ReplicationEventQueueInterceptor) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) { i.mtx.Lock() i.enqueue = append(i.enqueue, event) i.mtx.Unlock() var enqEvent ReplicationEvent var err error if i.onEnqueue != nil { enqEvent, err = i.onEnqueue(ctx, event, i.ReplicationEventQueue) } else { enqEvent, err = i.ReplicationEventQueue.Enqueue(ctx, event) } i.mtx.Lock() i.enqueueResult = append(i.enqueueResult, enqEvent) i.mtx.Unlock() return enqEvent, err } // Dequeue intercepts call to the Dequeue method of the underling implementation or a call back. // It populates storage of incoming and outgoing parameters before and after method call. func (i *ReplicationEventQueueInterceptor) Dequeue(ctx context.Context, virtualStorage, nodeStorage string, count int) ([]ReplicationEvent, error) { i.mtx.Lock() i.dequeue = append(i.dequeue, DequeParams{VirtualStorage: virtualStorage, NodeStorage: nodeStorage, Count: count}) i.mtx.Unlock() var deqEvents []ReplicationEvent var err error if i.onDequeue != nil { deqEvents, err = i.onDequeue(ctx, virtualStorage, nodeStorage, count, i.ReplicationEventQueue) } else { deqEvents, err = i.ReplicationEventQueue.Dequeue(ctx, virtualStorage, nodeStorage, count) } i.mtx.Lock() i.dequeueResult = append(i.dequeueResult, deqEvents) i.mtx.Unlock() return deqEvents, err } // Acknowledge intercepts call to the Acknowledge method of the underling implementation or a call back. // It populates storage of incoming and outgoing parameters before and after method call. func (i *ReplicationEventQueueInterceptor) Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error) { i.mtx.Lock() i.acknowledge = append(i.acknowledge, AcknowledgeParams{State: state, IDs: ids}) i.mtx.Unlock() var ackIDs []uint64 var err error if i.onAcknowledge != nil { ackIDs, err = i.onAcknowledge(ctx, state, ids, i.ReplicationEventQueue) } else { ackIDs, err = i.ReplicationEventQueue.Acknowledge(ctx, state, ids) } i.mtx.Lock() i.acknowledgeResult = append(i.acknowledgeResult, ackIDs) i.mtx.Unlock() return ackIDs, err } // StartHealthUpdate intercepts call to the StartHealthUpdate method of the underling implementation or a call back. func (i *ReplicationEventQueueInterceptor) StartHealthUpdate(ctx context.Context, trigger <-chan time.Time, events []ReplicationEvent) error { if i.onStartHealthUpdate != nil { return i.onStartHealthUpdate(ctx, trigger, events) } return i.ReplicationEventQueue.StartHealthUpdate(ctx, trigger, events) } // AcknowledgeStale intercepts call to the AcknowledgeStale method of the underling implementation or a call back. func (i *ReplicationEventQueueInterceptor) AcknowledgeStale(ctx context.Context, staleAfter time.Duration) (int64, error) { if i.onAcknowledgeStale != nil { return i.onAcknowledgeStale(ctx, staleAfter) } return i.ReplicationEventQueue.AcknowledgeStale(ctx, staleAfter) } // GetEnqueued returns a list of events used for Enqueue method or a call-back invocation. func (i *ReplicationEventQueueInterceptor) GetEnqueued() []ReplicationEvent { i.mtx.Lock() defer i.mtx.Unlock() return i.enqueue } // GetEnqueuedResult returns a list of events returned by Enqueue method or a call-back invocation. func (i *ReplicationEventQueueInterceptor) GetEnqueuedResult() []ReplicationEvent { i.mtx.Lock() defer i.mtx.Unlock() return i.enqueueResult } // GetDequeued returns a list of parameters used for Dequeue method or a call-back invocation. func (i *ReplicationEventQueueInterceptor) GetDequeued() []DequeParams { i.mtx.Lock() defer i.mtx.Unlock() return i.dequeue } // GetDequeuedResult returns a list of events returned after Dequeue method or a call-back invocation. func (i *ReplicationEventQueueInterceptor) GetDequeuedResult() [][]ReplicationEvent { i.mtx.Lock() defer i.mtx.Unlock() return i.dequeueResult } // GetAcknowledge returns a list of parameters used for Acknowledge method or a call-back invocation. func (i *ReplicationEventQueueInterceptor) GetAcknowledge() []AcknowledgeParams { i.mtx.Lock() defer i.mtx.Unlock() return i.acknowledge } // GetAcknowledgeResult returns a list of results returned after Acknowledge method or a call-back invocation. func (i *ReplicationEventQueueInterceptor) GetAcknowledgeResult() [][]uint64 { i.mtx.Lock() defer i.mtx.Unlock() return i.acknowledgeResult } // Wait checks the condition in a loop with await until it returns true or deadline is exceeded. // The error is returned only in case the deadline is exceeded. func (i *ReplicationEventQueueInterceptor) Wait(deadline time.Duration, condition func(i *ReplicationEventQueueInterceptor) bool) error { dead := time.Now().Add(deadline) for !condition(i) { if dead.Before(time.Now()) { return context.DeadlineExceeded } time.Sleep(time.Millisecond * 100) } return nil }