unison/cell.go (78 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package unison import "sync" // Cell stores some state of type interface{}. // Intermittent updates are lost, in case the Cell is updated faster than the // consumer tries to read for state updates. Updates are immediate, there will // be no backpressure applied to producers. // // In case the Cell is used to transmit updates without backpressure, the // absolute state must be computed by the producer beforehand. // // A typical use-case for cell is to generate asynchronous configuration updates (no deltas). // // The zero value of Cell is valid, but a value of type Cell can not be copied. type Cell struct { // All writes/reads to any of the internal fields must be guarded by mu. mu sync.Mutex // Current cell state to be Set and Get state interface{} // logical config state update counters. // The readID always follows writeID. We are using the most recent state // update if readID == waitID. writeID uint64 readID uint64 // waiter is not nil if we have at least on go-routine blocked in `Wait`. The // `waiter` channel will be closed on `Set`. waiter chan struct{} // mini-object pool. If a wait gets cancelled we move the active 'waiter' to // 'waiterBuf'. The next call to Wait will reuse the already allocated // resource. waiterBuf chan struct{} // number of go-routines that share the current waiter. `numWaiter` must be 0 if `waiter == nil`. // Invariant: The `waiter` must not be nil if `numWaiter > 0` numWaiter int // current `waiter` instance ID in order to track potential races between multiple go-routines using Wait. // We use fine grained locking. If `waiterSessionID` is increased since our last lock attempt, then our // current wait session is 'outdated' (numWaiter, waiter must not be modified). waiterSessionID uint } // NewCell creates a new call instance with its initial state. Subsequent reads // will return this state, if there have been no updates. func NewCell(st interface{}) *Cell { return &Cell{state: st} } // Get returns the current state. func (c *Cell) Get() interface{} { c.mu.Lock() defer c.mu.Unlock() return c.read() } // Wait blocks until it an update since the last call to Get or Wait has been found. // The cancel context can be used to interrupt the call to Wait early. The // error value will be set to the value returned by cancel.Err() in case Wait // was interrupted. Wait does not produce any errors that need to be handled by itself. func (c *Cell) Wait(cancel Canceler) (interface{}, error) { c.mu.Lock() if c.readID != c.writeID { defer c.mu.Unlock() return c.read(), nil } var waiter chan struct{} var waiterSession uint if c.waiter == nil { // no active waiter: start new session if c.waiterBuf != nil { waiter = c.waiterBuf c.waiterBuf = nil } else { waiter = make(chan struct{}) } c.waiter = waiter c.waiterSessionID++ } else { // some other go-routine is already waiting. Let's join the current session. waiter = c.waiter } waiterSession = c.waiterSessionID c.numWaiter++ c.mu.Unlock() select { case <-cancel.Done(): // we don't bother to check the waiter channel again. Cancellation if // detected has priority. c.mu.Lock() defer c.mu.Unlock() // if waiterID and c.waiterID do not match we have had a race with `Set` // cleaning up the waiter state and another go-routine already calling wait // before we managed to lock the mutex. In that case our waiterSession has // already been expired and we must not attempt to clean up the current // waiter state. if c.waiterSessionID == waiterSession { c.numWaiter-- if c.numWaiter < 0 { // Race between Set and context cancellation. Set did already clean up the overall waiter state. // We must not attempt to clean up the state again -> repair state by undoing the local cleanup c.numWaiter++ } else if c.numWaiter == 0 { // No more go-routine waiting for a state update and Set did not trigger yet. Let's clean up. c.waiterBuf = c.waiter c.waiter = nil } } return nil, cancel.Err() case <-waiter: c.mu.Lock() defer c.mu.Unlock() // waiter resource has been cleaned up by `Set`. Just read and return the // current known state. return c.read(), nil } } // Set updates the state of the Cell and unblocks a waiting consumer. // Set does not block. func (c *Cell) Set(st interface{}) { c.mu.Lock() defer c.mu.Unlock() c.writeID++ c.state = st if c.waiter != nil { close(c.waiter) c.waiter = nil c.numWaiter = 0 } } // read returns the current state and ensures that the next wait operation will // only block correctly if there was no Set since the last read. // // IMPORTANT: c.mu MUST be locked while calling read. func (c *Cell) read() interface{} { c.readID = c.writeID return c.state }