pkg/utils/broadcaster/broadcaster.go (239 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package broadcaster import ( "context" "reflect" ) // Broadcaster is a helper that tracks the current value of some piece of // data and forwards changes to a list of subscribers. It is for situations // where a variable list of subscribers need to observe some piece of state, // but it is important that the component that owns that state doesn't block. // It forwards data via synchronous channels, and subscribers can choose to // receive only the latest value, or to buffer changes up to some configurable // limit. // // For best results, Broadcaster's input should be owned and accessed by a // single goroutine, and that goroutine is responsible for providing updates // in the correct order. Broadcaster is still "safe" to use from multiple // goroutines, but its intended guarantees no longer apply: Broadcaster was // written to track atomic values from an authoritative source. If multiple // goroutines can write to it then there is no longer a well-defined "current" // value, and you might want to consider an alternate pattern. // Broadcaster has the following constraints / performance guarantees: // // - Broadcaster takes ownership of any values that are sent to it. If // appropriate for the data being observed, Broadcaster's owner should // create a deep copy before passing it in. If the datatype is a pointer // or contains pointers, subscribers should make their own deep copies // before modifying them. // // The most reliable way to guarantee safety is to use Broadcaster only // with value types / shallow structs. // // The most efficient way to guarantee safety (unless the required heap // allocations are prohibitive) is to use Broadcaster with a pointer type // that will never be modified, and to have trustworthy subscribers. // // - The input channel is weakly non-blocking: it blocks only for the time // it takes to copy the received values to an internal buffer (which means // in practice it will never block unless you write to it in a spin lock). // In particular, the input is unaffected by congestion or errors in its // subscribers. // // - For a subscriber with buffer length n, the output channel will always // return the oldest value that has not yet been sent on that channel among // the n most recent values since the subscription was started. In // particular, a subscriber with buffer length 0 will always receive the // most recent value at the time of the read. // // - If a subscriber channel has already received the n most recent values, // reads will block until a new value is set. // // - A new subscription will not hard-block on its first read, i.e. it is always // primed with the current value. // // Broadcaster can monitor the entire lifecycle of components, including the // shutdown state of terminating components, so instead of an all-or-nothing // context it uses a manual shutdown to allow for graceful delivery of the // final values. // // To indicate that there will be no more values, and gradually remove // subscribers as they reach the final value, call close(b.InputChan). // The run loop terminates when all subscribers are caught up or canceled. // // To shutdown Broadcaster immediately, closing all subscribers regardless // of whether they have received the final values, call Broadcaster.Close. // // To shutdown gradually, giving subscribers 5 seconds to catch up: // // close(b.InputChan) // select { // case <-b.Done(): // case <-time.After(5*time.Second): // b.Close() // } // // A caller that wants to clean up Broadcaster's data and run loop is // responsible for choosing one of these three options on shutdown. // A caller that needs to wait until Broadcaster's run loop is completely // finished (e.g. tests waiting for delivery of the final shutdown state) // should wait on b.Done() after initiating shutdown. type Broadcaster[T any] struct { // Values to be broadcasted should be sent to InputChan. InputChan chan T // Subscribe sends requests to subscribeChan where they are received // by runLoop and added to subscribers. subscribeChan chan subscribeRequest[T] // getChan is used by Get, which provides a way for non-subscribers to // do a one-time read of the most recent value. getChan chan T // Close writes to shutdownChan to notify Broadcaster that it should // close all subscribers and return. // Note that this signal uses a write rather than closing shutdownChan, // because closing a channel twice causes a panic, whereas writing to // it twice can be cancelled by selecting on doneChan as well. // (Hopefully callers won't invoke Close twice, but if they do it's // polite to avoid panicking.) shutdownChan chan struct{} // doneChan is closed when the run loop exits. It is exposed via // Done(), and callers can use it to wait until the Broadcaster // has completely finished. doneChan chan struct{} ///////////////////////////////////////////////////////////////////////// // Internal run loop state. Fields below could be local variables // in the run loop, but we break the run loop into individual iterations // and expose its fields to allow for deterministic unit tests. // A buffer with the most recent observed states. len(buffer) is // the largest buffer request that will be accepted from a subscriber. buffer []T // The list of current subscribers. A subscriber is removed from this // list when its context ends. subscribers []subscriber[T] // The cases that will be passed to reflect.Select. In order (see the // index* constants below): // - InputChan listener // - subscribeChan listener // - shutdownChan listener // - getChan writer // - For each subscriber, in the same order as the subscribers array, // two cases: the first reading the subscriber's context channel, // the second selectCases []reflect.SelectCase // index is initialized to zero and incremented with every new value, and // is used to track where subscribers are in Broadcaster's buffer relative // to the most recent value. Broadcaster's most recent value is in // buffer[index % len(buffer)] (i.e. "index" is the position of the current // value once we account for wrapping around at the end of the array). index int // shuttingDown indicates that InputChan has been closed. When this is true, // subscribers who finish reading all pending values have their listener // channels closed and are removed from the subscriber list, and when all // subscribers are removed the run loop returns. shuttingDown bool } // constant indices representing the order of the cases in // Broadcaster.selectCases. For context/listener cases for each subscriber // call indexCtxCase() and indexSubscriberCase() with the subscriber index. const ( indexInputCase = 0 indexSubscribeCase = 1 indexShutdownCase = 2 indexGetCase = 3 indexFirstSubscriberCase = 4 ) // subscribeRequest is sent to Broadcaster.subscribeChan to add a new // subscriber. type subscribeRequest[T any] struct { ctx context.Context listenerChan chan T bufferLen int } type subscriber[T any] struct { // The channel to send new values to. listenerChan chan T // The index of the next value that this subscriber should receive. // If subscriber.index == Broadcaster.index then the subscriber is waiting // to receive the most recent vaue. If subscriber.index > Broadcaster.index // then it has received all current values and will block until a new one // arrives. index int // How many values this subscriber will buffer in between reads. If bufferLen // is zero, this subscriber will always read the most recent value, otherwise // it will read the most recent values in order up to a limit of bufferLen. // bufferLen must be at most len(Broadcaster.buffer). bufferLen int } // New creates a Broadcaster and starts its run loop. The caller is responsible // for calling (*Broadcaster).Close to terminate the run loop when Broadcaster // should be cleaned up. (Cleanup is explicit rather than via a context so // subscribers can optionally read the final values before shutdown.) // The parameters specify the initial value, the size of the internal value // buffer, and the size of the input channel buffer. // // Race warning: if inputBuffer > 0 then the input's current value may be // slightly (~1 scheduler cycle) ahead of its subscribers. In particular this // sequence: // // b.InputChan <- newValue // b.Get() // // is not guaranteed to return newValue, since the read and write may arrive // at the Broadcaster run loop simultaneously. Often this is ok -- the owning // goroutine always has synchronous access to the data being broadcasted, and // most subscriber use cases care more about reliable reporting of the value // sequence than sub-ms delays -- but callers who really need strict causality // and are willing to block a little more to get it can set inputBuffer to 0, // which will guarantee that any Get / unbuffered subscriber read after writing // to InputChan will definitely receive the new value. func New[T any]( initialValue T, maxSubscriberBuffer int, inputBuffer int, ) *Broadcaster[T] { b := new(initialValue, maxSubscriberBuffer, inputBuffer) go b.runLoop() return b } // Subscribe adds a new subscriber to Broadcaster, returning a channel that // the subscriber can listen on for new values. While active, the channel // will return all changes to Broadcaster's tracked value, storing up to // bufferLen most recent values if the subscriber does not read them in time. // bufferLen values larger than the maxBuffer value passed to broadcaster.New // are capped. If bufferLen is zero, the listener channel always returns the // most recent value at the time of the read. // The channel will be closed when the given context expires, or when // Broadcaster itself shuts down. All returned channels are guaranteed to // produce at least one value on initial subscription, even if Broadcaster // has shut down. func (b *Broadcaster[T]) Subscribe(context context.Context, bufferLen int) chan T { req := subscribeRequest[T]{ ctx: context, listenerChan: make(chan T), bufferLen: bufferLen, } select { case b.subscribeChan <- req: // If the request goes through then we were successful, we can return // the output channel. return req.listenerChan case <-b.doneChan: // The Broadcaster has shut down. Return a closed channel with the final // value buffered inside, so we don't break callers that rely on our // guarantee that the first read always succeeds. closedChan := make(chan T, 1) closedChan <- b.currentValue() close(closedChan) return closedChan } } // Get returns Broadcaster's current value. It always succeeds, even if // Broadcaster has been closed, and it always gives the most recent value // at the time of the call. (However, remember that it's possible for the // value to change again in between the time that Get reads it and the // time that Get returns to the caller. If you need to know about changes // to the current value, use Subscribe instead.) func (b *Broadcaster[T]) Get() T { select { case value := <-b.getChan: return value case <-b.doneChan: // If doneChan is closed, then the run loop has shut down and the current // value will never change again, so it is safe to read it directly from // the caller's goroutine. return b.currentValue() } } // Close all subscriber channels, discarding any unsent values, and terminate // Broadcaster's run loop. // Close returns immediately once the shutdown signal is sent. Callers that // want to make sure the run loop has terminated completely should wait on // b.Done() after calling Close. // Callers who are shutting down but want Broadcaster to finish sending // the existing values to its subscribers before terminating should instead // call close(b.InputChan). func (b *Broadcaster[T]) Close() { // Try to send a shutdown signal, but cancel out if doneChan returns because // that means it already happened. select { case b.shutdownChan <- struct{}{}: case <-b.doneChan: } } // Done returns a channel callers can wait on to detect that Broadcaster has // shut down. When this channel is closed, Broadcaster's run loop has ended // and it will not send any more values. func (b *Broadcaster[T]) Done() <-chan struct{} { return b.doneChan } // The index in the selectCases array of the reflect.SelectCase for the // given subscriber's listener channel. func indexListenerCase(subscriberIndex int) int { return indexFirstSubscriberCase + 2*subscriberIndex + 1 } // new is the internal implementation of New that does everything except // start the run loop, for tests that want to handle execution steps // manually/synchronously. func new[T any]( initialValue T, maxSubscriberBuffer int, inputBuffer int, ) *Broadcaster[T] { b := &Broadcaster[T]{ InputChan: make(chan T, inputBuffer), // Non-input channels are synchronous: subscribers have to wait for us // to respond, only Broadcaster's owner gets to be completely non-blocking. subscribeChan: make(chan subscribeRequest[T]), getChan: make(chan T), shutdownChan: make(chan struct{}), doneChan: make(chan struct{}), // The array is maxBuffer+1 because we need to store the current value // in addition to any "buffered" values. If maxBuffer is 0, then we // store only the current value and the array is length 1. If // maxBuffer is 5, then we will store 5 previous values in addition to // the current one. buffer: make([]T, maxSubscriberBuffer+1), } // fixed order for the core select cases: input, subscribe, shutdown, get. b.selectCases = []reflect.SelectCase{ { Dir: reflect.SelectRecv, Chan: reflect.ValueOf(b.InputChan), }, { Dir: reflect.SelectRecv, Chan: reflect.ValueOf(b.subscribeChan), }, { Dir: reflect.SelectRecv, Chan: reflect.ValueOf(b.shutdownChan), }, { Dir: reflect.SelectSend, Chan: reflect.ValueOf(b.getChan), Send: reflect.ValueOf(initialValue), }, } b.buffer[b.index] = initialValue return b } // runLoop is Broadcaster's main loop, which listens for updates from the // input and sends any buffered values to registered subscribers. // Its logic is implemented in runLoopIterate. func (b *Broadcaster[T]) runLoop() { defer close(b.doneChan) for b.runLoopIterate() { } } // runLoopIterate is the interior of the run loop, which is split into its own // helper function so we can deterministically test Broadcaster's behavior in // the unit tests. // Returns true if the loop should keep running, false to shut down. func (b *Broadcaster[T]) runLoopIterate() bool { b.drainInput() chosen, recvValue, recvOK := reflect.Select(b.selectCases) switch chosen { case indexInputCase: if recvOK { // We've received a new value, add it to the internal buffer and update // listener select cases. value := recvValue.Interface().(T) b.handleNewInput(value) b.updateListeners() } else { // The input channel is closed, begin shutting down. Setting shuttingDown // to true means subscribers will be removed as they finish reading // buffered values, and when all active subscribers are removed runLoop // will return. We also clear the channel in input's select case, so we // can keep iterating without hitting this case again. b.shuttingDown = true b.selectCases[indexInputCase].Chan = reflect.ValueOf(nil) } case indexSubscribeCase: req := recvValue.Interface().(subscribeRequest[T]) b.handleNewSubscriber(req) case indexShutdownCase: b.shutdown() case indexGetCase: // Someone has read our current state, but we don't need to do anything. default: // The selected case is from one of our subscribers. // Each subscriber covers two cases, figure out which one we got. subscriberIndex := (chosen - indexFirstSubscriberCase) / 2 if (chosen-indexFirstSubscriberCase)%2 == 0 { // The first case for each subscriber is the shutdown channel -- remove // this subscriber from the list. b.removeSubscriber(subscriberIndex) } else { // The second case for each subscriber is their listener channel -- they // just received a value, advance their position in the buffer. b.advanceSubscriber(subscriberIndex) } } // Keep iterating if shutdown hasn't been initiated and/or we still have // active subscribers. return !b.shuttingDown || len(b.subscribers) > 0 } // drainInput reads as many values as possible from the input channel and // updates subscriber state if there were any new values. // This is called in the run loop before the main select, to make sure input // values get priority over any other signal. func (b *Broadcaster[T]) drainInput() { receivedInput := false defer func() { // If we received any values, refresh the subscriber state. if receivedInput { b.updateListeners() } }() for { select { case value, ok := <-b.InputChan: // If the channel is closed, return immediately so runLoopIterate // can handle it. Otherwise, handle any input. if !ok { return } b.handleNewInput(value) receivedInput = true default: return } } } // currentValue is a simple internal helper to return the most recent value // in the buffer. func (b *Broadcaster[T]) currentValue() T { return b.buffer[b.index%len(b.buffer)] } // handleNewInput adds an incoming value to the internal buffer. Callers should // also call updateListeners() when all changes are done. (handleNewInput // doesn't do this so we can avoid redundant passes through the subscriber list // when draining the input channel. In practice this is rare, but if the input // channel ever does gets backed up then draining it is our highest priority.) func (b *Broadcaster[T]) handleNewInput(value T) { b.index++ b.buffer[b.index%len(b.buffer)] = value } func (b *Broadcaster[T]) handleNewSubscriber(req subscribeRequest[T]) { // Add two select cases for the new subscriber, one for its context channel // and one for its listener channel. b.selectCases = append(b.selectCases, reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(req.ctx.Done()), }, reflect.SelectCase{ Dir: reflect.SelectSend, Chan: reflect.ValueOf(req.listenerChan), Send: reflect.ValueOf(b.currentValue()), }, ) // Cap the requested buffer length at the maximum set when Broadcaster // was created. // Subtlety: in order for a bufferLen of 0 to correspond to the most // recent value, the actual buffer inside Broadcaster must have length // at least bufferLen+1, so bufferLen is capped at len(buffer)-1. bufferLen := req.bufferLen if bufferLen > len(b.buffer)-1 { bufferLen = len(b.buffer) - 1 } b.subscribers = append(b.subscribers, subscriber[T]{ listenerChan: req.listenerChan, index: b.index, // Start with the current value bufferLen: bufferLen, }) } // Close the target subscriber's listener channel and remove it from the lists // of subscribers and select cases. func (b *Broadcaster[T]) removeSubscriber(subscriberIndex int) { close(b.subscribers[subscriberIndex].listenerChan) b.subscribers = append(b.subscribers[:subscriberIndex], b.subscribers[subscriberIndex+1:]...) // Index in the select cases is the index of the first subscriber // plus two cases for each subscriber. caseIndex := indexFirstSubscriberCase + 2*subscriberIndex b.selectCases = append(b.selectCases[:caseIndex], b.selectCases[caseIndex+2:]...) } // advanceSubscriber is called when a subscriber reads a value, to advance // it to the next index and either prepare its listener select case with the // next value or block it if there are no more values. func (b *Broadcaster[T]) advanceSubscriber(subscriberIndex int) { s := &b.subscribers[subscriberIndex] s.index++ selectCaseIndex := indexListenerCase(subscriberIndex) if s.index > b.index { // No more values to read. If we're shutting down, remove // this subscriber, otherwise just block the channel for now. if b.shuttingDown { b.removeSubscriber(subscriberIndex) } else { b.selectCases[selectCaseIndex].Chan = reflect.ValueOf(nil) } } else { // Load the send channel with the buffer value at s.index b.selectCases[selectCaseIndex].Send = reflect.ValueOf(b.buffer[s.index%len(b.buffer)]) } } // updateListeners is called after new input comes in to advance subscriber // position if necessary and reactivate their listener channels with the new // values, and to load the most recent value into getChan. func (b *Broadcaster[T]) updateListeners() { for i := range b.subscribers { selectCaseIndex := indexListenerCase(i) subscriber := &b.subscribers[i] if subscriber.index <= b.index { // The subscriber hasn't read the most recent value, unblock its channel. b.selectCases[selectCaseIndex].Chan = reflect.ValueOf(subscriber.listenerChan) } if subscriber.index < b.index-subscriber.bufferLen { // If the subscriber is further behind than its buffer size, then // we need to advance it. subscriber.index = b.index - subscriber.bufferLen } // Load the subscriber's new target value into its send channel. b.selectCases[selectCaseIndex].Send = reflect.ValueOf(b.buffer[subscriber.index%len(b.buffer)]) } // Update getChan, used for standalone reads from non-subscribers. b.selectCases[indexGetCase].Send = reflect.ValueOf(b.currentValue()) } // shutdown sets the shuttingDown flag and closes all subscribers, so the // run loop will return after the current iteration. func (b *Broadcaster[T]) shutdown() { b.shuttingDown = true // Possibly overkill, but remove subscribers in reverse order so the array // can be truncated as we go instead of copying the whole thing each step. for i := len(b.subscribers) - 1; i >= 0; i-- { b.removeSubscriber(i) } }