events/events.go (92 lines of code) (raw):
// Copyright (c) 2015 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package events
import (
"sync"
"time"
)
// Event is an empty interface that is type switched when handeled.
type Event interface{}
// An EventListener handles events given to it by the Ringpop, as well as forwarded events from
// the SWIM node contained by the ringpop. HandleEvent should be thread safe.
type EventListener interface {
HandleEvent(event Event)
}
// EventEmitter can add and remove listeners which will be invoked when an event
// is emitted.
type EventEmitter interface {
// AddListener adds a listener that will be invoked when an evemit is
// emitted via EmitEvent. The value returned indicates if the listener have
// been added or not. The operation will not fail, but a listener will only
// be added once.
AddListener(EventListener) bool
// RemoveListener removes a listener and prevents it being invoked in
// subsequent emitted events. The return value indicates if a value has been
// removed or not.
RemoveListener(EventListener) bool
// EmitEvent invokes all the listeners that are registered with the
// EventEmitter
EmitEvent(Event)
}
type sharedEventEmitter struct {
// listeners is a slice keeping all added EvenListener interfaces. The slice
// is only assinged, but never altered in place. When the slice is accessed
// an approriate lock needs to be aquired on listenersLock. After reading
// the slice it is safe to release the lock because the underlying array is
// never changed in place.
listeners []EventListener
listenersLock sync.RWMutex
}
// AddListener adds a listener to the EventEmitter. Events emitted on this
// emitter will be invoked on the listener. The return value indicates if the
// listener has been added or not. It can't be added if it is already added and
// therefore registered to receive events
func (a *sharedEventEmitter) AddListener(l EventListener) bool {
if l == nil {
// do not register nil listener, will cause nil pointer dereference during
// event emitting
return false
}
a.listenersLock.Lock()
defer a.listenersLock.Unlock()
// Check if listener is already registered
for _, listener := range a.listeners {
if listener == l {
return false
}
}
// by making a copy the backing array will never be changed after its creation.
// this allowes to copy the slice while locked but iterate while not locked
// preventing deadlocks when listeners are added/removed in the handler of a
// listener
listenersCopy := make([]EventListener, 0, len(a.listeners)+1)
listenersCopy = append(listenersCopy, a.listeners...)
listenersCopy = append(listenersCopy, l)
a.listeners = listenersCopy
return true
}
// RemoveListener removes a listener from the EventEmitter. Subsequent calls to
// EmitEvent will not cause HandleEvent to be called on this listener. The
// return value indicates if a listener has been removed or not. The listener
// can't be removed if it was not present before.
func (a *sharedEventEmitter) RemoveListener(l EventListener) bool {
a.listenersLock.Lock()
defer a.listenersLock.Unlock()
for i := range a.listeners {
if a.listeners[i] == l {
// create a new list excluding the listener that needs removal
listenersCopy := make([]EventListener, 0, len(a.listeners)-1)
listenersCopy = append(listenersCopy, a.listeners[:i]...)
listenersCopy = append(listenersCopy, a.listeners[i+1:]...)
a.listeners = listenersCopy
return true
}
}
return false
}
// AsyncEventEmitter is an implementation of both an EventRegistar and EventEmitter
// that emits events in their own go routine.
type AsyncEventEmitter struct {
sharedEventEmitter
}
// EmitEvent will send the event to all registered listeners
func (a *AsyncEventEmitter) EmitEvent(event Event) {
a.listenersLock.RLock()
for _, listener := range a.listeners {
go listener.HandleEvent(event)
}
a.listenersLock.RUnlock()
}
// SyncEventEmitter is an implementation of both an EventRegistar and EventEmitter
// that emits events in the calling go routine.
type SyncEventEmitter struct {
sharedEventEmitter
}
// EmitEvent will send the event to all registered listeners
func (a *SyncEventEmitter) EmitEvent(event Event) {
// we copy the slice to a local variable before calling the listeners. This
// makes it possible for the listener to remove itself during its invocation
// without running into a deadlock. Since the underlying array is immutable
// (by convention) reading it without the lock is safe to do
a.listenersLock.RLock()
listeners := a.listeners
a.listenersLock.RUnlock()
for _, listener := range listeners {
listener.HandleEvent(event)
}
}
// A RingChangedEvent is sent when servers are added and/or removed from the ring
type RingChangedEvent struct {
ServersAdded []string
ServersUpdated []string
ServersRemoved []string
}
// RingChecksumEvent is sent when a server is removed or added and a new checksum
// for the ring is calculated
type RingChecksumEvent struct {
// OldChecksum contains the previous legacy checksum. Note: might be deprecated in the future.
OldChecksum uint32
// NewChecksum contains the new legacy checksum. Note: might be deprecated in the future.
NewChecksum uint32
// OldChecksums contains the map of previous checksums
OldChecksums map[string]uint32
// NewChecksums contains the map with new checksums
NewChecksums map[string]uint32
}
// A LookupEvent is sent when a lookup is performed on the Ringpop's ring
type LookupEvent struct {
Key string
Duration time.Duration
}
// A LookupNEvent is sent when a lookupN is performed on the Ringpop's ring
type LookupNEvent struct {
Key string
N int
Duration time.Duration
}
// Ready is fired when ringpop has successfully bootstrapped and is ready to receive requests and other method calls.
type Ready struct{}
// Destroyed is fired when ringpop has been destroyed and should not be responding to requests or lookup requests.
type Destroyed struct{}