in plugins/targetlocker/inmemory/inmemory.go [74:177]
func broker(clk clock.Clock, lockRequests, unlockRequests <-chan *request, done <-chan struct{}) {
locks := make(map[string]lock)
for {
select {
case <-done:
return
case req := <-lockRequests:
if err := validateRequest(req); err != nil {
req.err <- fmt.Errorf("lock request: %w", err)
continue
}
var lockErr error
// newLocks is the state of the locks that have been modified by this transaction
// If there is an error, we discard newLocks leaving the state of 'locks' untouched
// otherwise we update 'locks' with the modifed locks after the transaction has completed
newLocks := make(map[string]lock)
for _, t := range req.targets {
// don't go over limit
if uint(len(newLocks)) >= req.limit {
break
}
now := clk.Now()
if l, ok := locks[t.ID]; ok {
// target has been locked before. are/were we the owner?
// if so, extend (even if previous lease has expired).
if l.owner == req.owner {
// we are trying to extend a lock.
l.expiresAt = clk.Now().Add(req.timeout)
newLocks[t.ID] = l
} else {
// no, it's not us. can we take over?
if now.After(l.expiresAt) {
if !req.requireLocked {
// lock has expired, consider it unlocked
newLocks[t.ID] = lock{
owner: req.owner,
createdAt: now,
expiresAt: now.Add(req.timeout),
}
} else {
lockErr = fmt.Errorf("target %q must be locked but isn't", t)
break
}
} else {
// already locked
if !req.allowConflicts {
lockErr = fmt.Errorf("target %q is already locked by %d", t, l.owner)
break
}
continue
}
}
} else {
if !req.requireLocked {
// target not locked and never been, create new lock
newLocks[t.ID] = lock{
owner: req.owner,
createdAt: now,
expiresAt: now.Add(req.timeout),
}
} else {
lockErr = fmt.Errorf("target %q must be locked but isn't", t)
break
}
}
}
req.locked = make([]string, 0, len(newLocks))
if lockErr == nil {
// everything in this transaction was OK - update the locks
for id, l := range newLocks {
locks[id] = l
req.locked = append(req.locked, id)
}
}
req.err <- lockErr
case req := <-unlockRequests:
if err := validateRequest(req); err != nil {
req.err <- fmt.Errorf("unlock request: %w", err)
continue
}
req.ctx.Debugf("Requested to transactionally unlock %d targets: %v", len(req.targets), req.targets)
// validate
var unlockErr error
for _, t := range req.targets {
l, ok := locks[t.ID]
if !ok {
unlockErr = fmt.Errorf("unlock request: target %q is not locked", t)
break
}
if l.owner != req.owner {
unlockErr = fmt.Errorf("unlock request: target %q is locked by %d, not by %d", t, l.owner, req.owner)
break
}
}
// apply
if unlockErr == nil {
for _, t := range req.targets {
delete(locks, t.ID)
}
}
req.err <- unlockErr
}
}
}