plugins/targetlocker/inmemory/inmemory.go (197 lines of code) (raw):

// Copyright (c) Facebook, Inc. and its affiliates. // // This source code is licensed under the MIT license found in the // LICENSE file in the root directory of this source tree. // Package inmemory implements an in-memory target locker. // WARNING: since locking is done in memory, locally to the ConTest server, this // will not prevent other ConTest servers to know that the target is in use. package inmemory import ( "fmt" "time" "github.com/benbjohnson/clock" "github.com/facebookincubator/contest/pkg/target" "github.com/facebookincubator/contest/pkg/types" "github.com/facebookincubator/contest/pkg/xcontext" ) // Name is the name used to look this plugin up. var Name = "InMemory" type request struct { ctx xcontext.Context targets []*target.Target // requireLocked specifies whether targets must be already locked, used by refresh. requireLocked bool // allowConflicts enabled tryLock semantics: Return what can be locked, but // don't error on conflicts allowConflicts bool // owner is the owner of the lock, relative to the lock operation request, // represented by a job ID. // The in-memory locker enforces that the requests are validated against the // right owner. owner types.JobID // limit provides an upper limit on how many locks will be acquired limit uint // timeout is the initial lock duration when acquiring a new lock // during target acquisition. This should include TargetManagerAcquireTimeout // to allow for dynamic locking in the target manager. timeout time.Duration // locked is list of target IDs that were locked in this transaction (if any) locked []string // err reports whether there were errors in any lock-related operation. err chan error } type lock struct { owner types.JobID createdAt time.Time expiresAt time.Time } func validateRequest(req *request) error { if req == nil { return fmt.Errorf("got nil request") } if req.owner == 0 { return fmt.Errorf("owner cannot be zero") } for _, target := range req.targets { if target.ID == "" { return fmt.Errorf("target list cannot contain empty target ID. Full list: %v", req.targets) } } return nil } // broker is the broker of locking requests, and it's the only goroutine with // access to the locks map, in accordance with Go's "share memory by // communicating" principle. func broker(clk clock.Clock, lockRequests, unlockRequests <-chan *request, done <-chan struct{}) { locks := make(map[string]lock) for { select { case <-done: return case req := <-lockRequests: if err := validateRequest(req); err != nil { req.err <- fmt.Errorf("lock request: %w", err) continue } var lockErr error // newLocks is the state of the locks that have been modified by this transaction // If there is an error, we discard newLocks leaving the state of 'locks' untouched // otherwise we update 'locks' with the modifed locks after the transaction has completed newLocks := make(map[string]lock) for _, t := range req.targets { // don't go over limit if uint(len(newLocks)) >= req.limit { break } now := clk.Now() if l, ok := locks[t.ID]; ok { // target has been locked before. are/were we the owner? // if so, extend (even if previous lease has expired). if l.owner == req.owner { // we are trying to extend a lock. l.expiresAt = clk.Now().Add(req.timeout) newLocks[t.ID] = l } else { // no, it's not us. can we take over? if now.After(l.expiresAt) { if !req.requireLocked { // lock has expired, consider it unlocked newLocks[t.ID] = lock{ owner: req.owner, createdAt: now, expiresAt: now.Add(req.timeout), } } else { lockErr = fmt.Errorf("target %q must be locked but isn't", t) break } } else { // already locked if !req.allowConflicts { lockErr = fmt.Errorf("target %q is already locked by %d", t, l.owner) break } continue } } } else { if !req.requireLocked { // target not locked and never been, create new lock newLocks[t.ID] = lock{ owner: req.owner, createdAt: now, expiresAt: now.Add(req.timeout), } } else { lockErr = fmt.Errorf("target %q must be locked but isn't", t) break } } } req.locked = make([]string, 0, len(newLocks)) if lockErr == nil { // everything in this transaction was OK - update the locks for id, l := range newLocks { locks[id] = l req.locked = append(req.locked, id) } } req.err <- lockErr case req := <-unlockRequests: if err := validateRequest(req); err != nil { req.err <- fmt.Errorf("unlock request: %w", err) continue } req.ctx.Debugf("Requested to transactionally unlock %d targets: %v", len(req.targets), req.targets) // validate var unlockErr error for _, t := range req.targets { l, ok := locks[t.ID] if !ok { unlockErr = fmt.Errorf("unlock request: target %q is not locked", t) break } if l.owner != req.owner { unlockErr = fmt.Errorf("unlock request: target %q is locked by %d, not by %d", t, l.owner, req.owner) break } } // apply if unlockErr == nil { for _, t := range req.targets { delete(locks, t.ID) } } req.err <- unlockErr } } } // InMemory locks targets in an in-memory map. type InMemory struct { lockRequests, unlockRequests chan *request done chan struct{} } func newReq(ctx xcontext.Context, jobID types.JobID, targets []*target.Target) request { return request{ ctx: ctx, targets: targets, owner: jobID, err: make(chan error), } } // Lock locks the specified targets. func (tl *InMemory) Lock(ctx xcontext.Context, jobID types.JobID, duration time.Duration, targets []*target.Target) error { req := newReq(ctx, jobID, targets) req.timeout = duration req.requireLocked = false req.allowConflicts = false req.limit = uint(len(targets)) tl.lockRequests <- &req err := <-req.err ctx.Debugf("Lock %d targets for %s: %v", len(targets), duration, err) return err } // Lock locks the specified targets. func (tl *InMemory) TryLock(ctx xcontext.Context, jobID types.JobID, duration time.Duration, targets []*target.Target, limit uint) ([]string, error) { req := newReq(ctx, jobID, targets) req.timeout = duration req.requireLocked = false req.allowConflicts = true req.limit = limit tl.lockRequests <- &req // wait for result err := <-req.err ctx.Debugf("TryLock %d targets for %s: %d %v", len(targets), duration, len(req.locked), err) return req.locked, err } // Unlock unlocks the specified targets. func (tl *InMemory) Unlock(ctx xcontext.Context, jobID types.JobID, targets []*target.Target) error { req := newReq(ctx, jobID, targets) tl.unlockRequests <- &req err := <-req.err ctx.Debugf("Unlock %d targets: %v", len(targets), err) return err } // RefreshLocks extends the lock duration by the internally configured timeout. If // the owner is different, the request is rejected. func (tl *InMemory) RefreshLocks(ctx xcontext.Context, jobID types.JobID, duration time.Duration, targets []*target.Target) error { req := newReq(ctx, jobID, targets) req.timeout = duration req.requireLocked = true req.allowConflicts = false req.limit = uint(len(targets)) // refreshing a lock is just a lock operation with the same owner and a new // duration. tl.lockRequests <- &req err := <-req.err ctx.Debugf("RefreshLocks on %d targets for %s: %v", len(targets), duration, err) return err } // Close stops the brokern and releases resources. func (tl *InMemory) Close() error { close(tl.done) return nil } // New initializes and returns a new InMemory target locker. func New(clk clock.Clock) target.Locker { lockRequests := make(chan *request) unlockRequests := make(chan *request) done := make(chan struct{}) go broker(clk, lockRequests, unlockRequests, done) return &InMemory{ lockRequests: lockRequests, unlockRequests: unlockRequests, done: done, } }