in unison/cell.go [82:145]
func (c *Cell) Wait(cancel Canceler) (interface{}, error) {
c.mu.Lock()
if c.readID != c.writeID {
defer c.mu.Unlock()
return c.read(), nil
}
var waiter chan struct{}
var waiterSession uint
if c.waiter == nil {
// no active waiter: start new session
if c.waiterBuf != nil {
waiter = c.waiterBuf
c.waiterBuf = nil
} else {
waiter = make(chan struct{})
}
c.waiter = waiter
c.waiterSessionID++
} else {
// some other go-routine is already waiting. Let's join the current session.
waiter = c.waiter
}
waiterSession = c.waiterSessionID
c.numWaiter++
c.mu.Unlock()
select {
case <-cancel.Done():
// we don't bother to check the waiter channel again. Cancellation if
// detected has priority.
c.mu.Lock()
defer c.mu.Unlock()
// if waiterID and c.waiterID do not match we have had a race with `Set`
// cleaning up the waiter state and another go-routine already calling wait
// before we managed to lock the mutex. In that case our waiterSession has
// already been expired and we must not attempt to clean up the current
// waiter state.
if c.waiterSessionID == waiterSession {
c.numWaiter--
if c.numWaiter < 0 {
// Race between Set and context cancellation. Set did already clean up the overall waiter state.
// We must not attempt to clean up the state again -> repair state by undoing the local cleanup
c.numWaiter++
} else if c.numWaiter == 0 {
// No more go-routine waiting for a state update and Set did not trigger yet. Let's clean up.
c.waiterBuf = c.waiter
c.waiter = nil
}
}
return nil, cancel.Err()
case <-waiter:
c.mu.Lock()
defer c.mu.Unlock()
// waiter resource has been cleaned up by `Set`. Just read and return the
// current known state.
return c.read(), nil
}
}