func broker()

in plugins/targetlocker/inmemory/inmemory.go [74:177]


func broker(clk clock.Clock, lockRequests, unlockRequests <-chan *request, done <-chan struct{}) {
	locks := make(map[string]lock)
	for {
		select {
		case <-done:
			return
		case req := <-lockRequests:
			if err := validateRequest(req); err != nil {
				req.err <- fmt.Errorf("lock request: %w", err)
				continue
			}
			var lockErr error
			// newLocks is the state of the locks that have been modified by this transaction
			// If there is an error, we discard newLocks leaving the state of 'locks' untouched
			// otherwise we update 'locks' with the modifed locks after the transaction has completed
			newLocks := make(map[string]lock)
			for _, t := range req.targets {
				// don't go over limit
				if uint(len(newLocks)) >= req.limit {
					break
				}
				now := clk.Now()
				if l, ok := locks[t.ID]; ok {
					// target has been locked before. are/were we the owner?
					// if so, extend (even if previous lease has expired).
					if l.owner == req.owner {
						// we are trying to extend a lock.
						l.expiresAt = clk.Now().Add(req.timeout)
						newLocks[t.ID] = l
					} else {
						// no, it's not us. can we take over?
						if now.After(l.expiresAt) {
							if !req.requireLocked {
								// lock has expired, consider it unlocked
								newLocks[t.ID] = lock{
									owner:     req.owner,
									createdAt: now,
									expiresAt: now.Add(req.timeout),
								}
							} else {
								lockErr = fmt.Errorf("target %q must be locked but isn't", t)
								break
							}
						} else {
							// already locked
							if !req.allowConflicts {
								lockErr = fmt.Errorf("target %q is already locked by %d", t, l.owner)
								break
							}
							continue
						}
					}
				} else {
					if !req.requireLocked {
						// target not locked and never been, create new lock
						newLocks[t.ID] = lock{
							owner:     req.owner,
							createdAt: now,
							expiresAt: now.Add(req.timeout),
						}
					} else {
						lockErr = fmt.Errorf("target %q must be locked but isn't", t)
						break
					}
				}
			}
			req.locked = make([]string, 0, len(newLocks))
			if lockErr == nil {
				// everything in this transaction was OK - update the locks
				for id, l := range newLocks {
					locks[id] = l
					req.locked = append(req.locked, id)
				}
			}
			req.err <- lockErr
		case req := <-unlockRequests:
			if err := validateRequest(req); err != nil {
				req.err <- fmt.Errorf("unlock request: %w", err)
				continue
			}
			req.ctx.Debugf("Requested to transactionally unlock %d targets: %v", len(req.targets), req.targets)
			// validate
			var unlockErr error
			for _, t := range req.targets {
				l, ok := locks[t.ID]
				if !ok {
					unlockErr = fmt.Errorf("unlock request: target %q is not locked", t)
					break
				}
				if l.owner != req.owner {
					unlockErr = fmt.Errorf("unlock request: target %q is locked by %d, not by %d", t, l.owner, req.owner)
					break
				}
			}
			// apply
			if unlockErr == nil {
				for _, t := range req.targets {
					delete(locks, t.ID)
				}
			}
			req.err <- unlockErr
		}
	}
}