internal/events/events.go (278 lines of code) (raw):
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package events is a events processing layer.
package events
import (
"context"
"fmt"
"sync"
"sync/atomic"
"github.com/GoogleCloudPlatform/galog"
)
var (
instance *Manager
)
// Watcher defines the interface between the events manager and the actual
// watcher implementation.
type Watcher interface {
// ID returns the watcher id.
ID() string
// Events return a slice with all the event types a given Watcher handles.
Events() []string
// Run implements the actual "listening" strategy and emits a event "signal".
// It must return:
// - [bool] if the watcher should renew(run again).
// - [any] a event context data pointer further describing the
// event(if needed).
// - [err] error case the Watcher failed and wants to notify subscribers
// (see EventData).
Run(ctx context.Context, evType string) (bool, any, error)
}
// Manager defines the interface between events management layer and the
// core guest agent implementation.
type Manager struct {
// watcherEvents maps the registered watchers and their events.
watcherEvents []*WatcherEventType
// watchersMutex protects the watchers map.
watchersMutex sync.RWMutex
// watchersMap is a convenient manager's mapping of registered watcher
// instances.
watchersMap map[string]bool
// removingWatcherEventsMutex protects the removingWatcherEvents map.
removingWatcherEventsMutex sync.RWMutex
// removingWatcherEvents is a map of watchers being removed.
removingWatcherEvents map[string]bool
// running is a flag indicating if the Run() was previously called.
running atomic.Bool
// subscribersMutex protects subscribers member/map of the manager object.
subscribersMutex sync.RWMutex
// subscribers maps the subscribed callbacks.
subscribers map[string][]*EventSubscriber
// queue queue struct manages the running watchers, when it gets to len()
// down to zero means all watchers are done and we can signal the other
// control go routines to leave(given we don't have any more job left to
// process).
queue *watcherQueue
}
// watcherQueue wraps the watchers <-> callbacks communication as well as the
// communication/coordination of the multiple control go routine i.e. the one
// responsible to calling callbacks after a event is produced by the watcher
// etc.
type watcherQueue struct {
// queueMutex protects the access to watchersMap
queueMutex sync.RWMutex
// watchersMap maps the currently running watchers.
watchersMap map[string]bool
// watcherDone is a channel used to communicate that a given watcher is
// finished/done.
watcherDone chan string
// dataBus is the channel used to communicate between watchers
// (event producer) and the callback handler (event consumer managing go
// routine).
dataBus chan eventBusData
}
// EventData wraps the data communicated from a Watcher to a Subscriber.
type EventData struct {
// Data points to the Watcher provided data.
Data any
// Error is used when a Watcher has failed and wants communicate its
// subscribers about the error.
Error error
}
// WatcherEventType wraps/couples together a Watcher and an event type.
type WatcherEventType struct {
// watcher is the watcher implementation for a given event type.
watcher Watcher
// evType identifies the event type this object references to.
evType string
// removed is a channel used to communicate with the running watcher go
// routine that it shouldn't renew even if the watcher requested a renew (in
// response of a RemoveWatcher() call.
removed chan bool
}
// EventSubscriber represents the subscriber for an event.
type EventSubscriber struct {
// Name identifies the event subscriber.
Name string
// Data points to the event subscriber provided data that will be passed down
// during callback.
Data any
// Callback is the callback function that will be called when a new event
// happens.
Callback EventCb
}
type eventBusData struct {
evType string
data *EventData
}
// EventCb defines the callback interface between watchers and subscribers. The
// arguments are:
// - ctx the app' context passed in from the manager's Run() call.
// - evType a string defining the what event type triggered the call.
// - data a user context pointer to be consumed by the callback.
// - evData a event specific data pointer.
//
// The callback should return true if it wants to renew, returning false will
// case the callback to be unregistered/unsubscribed.
type EventCb func(ctx context.Context, evType string, data any, evData *EventData) bool
// length returns how many watchers are currently running.
func (wq *watcherQueue) length() int {
wq.queueMutex.RLock()
defer wq.queueMutex.RUnlock()
return len(wq.watchersMap)
}
// add adds a new watcher to the queue.
func (wq *watcherQueue) add(evType string) {
wq.queueMutex.Lock()
defer wq.queueMutex.Unlock()
wq.watchersMap[evType] = true
}
// del removes a watcher from the queue.
func (wq *watcherQueue) del(evType string) int {
wq.queueMutex.Lock()
defer wq.queueMutex.Unlock()
delete(wq.watchersMap, evType)
return len(wq.watchersMap)
}
// newManager allocates and initializes a events Manager.
func newManager() *Manager {
return &Manager{
watchersMap: make(map[string]bool),
removingWatcherEvents: make(map[string]bool),
subscribers: make(map[string][]*EventSubscriber),
queue: &watcherQueue{
watchersMap: make(map[string]bool),
dataBus: make(chan eventBusData),
watcherDone: make(chan string),
},
}
}
func init() {
instance = newManager()
}
// FetchManager returns the one previously allocated Manager object.
func FetchManager() *Manager {
if instance == nil {
panic("The event's manager instance should had being initialized.")
}
return instance
}
// IsSubscribed returns true if the given subscriber is subscribed to the given
// event type.
func (m *Manager) IsSubscribed(evType, subID string) bool {
m.subscribersMutex.Lock()
defer m.subscribersMutex.Unlock()
subs, ok := m.subscribers[evType]
if !ok {
return false
}
for _, curr := range subs {
if curr.Name == subID {
return true
}
}
return false
}
// Subscribe registers an event consumer/subscriber callback to a given event
// type, data is a context pointer provided by the caller to be passed down when
// calling cb when a new event happens.
func (m *Manager) Subscribe(evType string, sub EventSubscriber) {
m.subscribersMutex.Lock()
defer m.subscribersMutex.Unlock()
m.subscribers[evType] = append(m.subscribers[evType], &sub)
}
// Unsubscribe removes the subscription of a given subscriber for a given event
// type.
func (m *Manager) Unsubscribe(evType string, subscriber string) {
m.subscribersMutex.Lock()
defer m.subscribersMutex.Unlock()
var keepMe []*EventSubscriber
for _, curr := range m.subscribers[evType] {
if curr.Name != subscriber {
keepMe = append(keepMe, curr)
}
}
m.subscribers[evType] = keepMe
if len(keepMe) == 0 {
galog.Debugf("No more subscribers left for evType: %s", evType)
delete(m.subscribers, evType)
}
}
// deleteRemovingEvent deletes a removing event.
func (m *Manager) deleteRemovingEvent(evType string) {
m.removingWatcherEventsMutex.Lock()
defer m.removingWatcherEventsMutex.Unlock()
delete(m.removingWatcherEvents, evType)
}
// RemoveWatcher removes a watcher from the event manager. Each running watcher
// has its own context (derived from the one provided in the AddWatcher() call)
// and will have it canceled after calling this method.
func (m *Manager) RemoveWatcher(ctx context.Context, watcher Watcher) {
m.removingWatcherEventsMutex.Lock()
defer m.removingWatcherEventsMutex.Unlock()
id := watcher.ID()
galog.Debugf("Got a request to remove watcher: %s", id)
m.watchersMutex.Lock()
_, found := m.watchersMap[id]
delete(m.watchersMap, id)
m.watchersMutex.Unlock()
if !found {
galog.Debugf("Watcher(%s) was not found, skipping removal request", id)
return
}
for _, curr := range m.watcherEvents {
if _, found := m.removingWatcherEvents[curr.evType]; found {
galog.Debugf("Watcher(%s) is being removed, skipping removal request: %s", id, curr.evType)
continue
}
if curr.watcher.ID() == id {
m.removingWatcherEvents[curr.evType] = true
galog.Debugf("Removing watcher: %s, event type: %s", id, curr.evType)
if m.running.Load() {
curr.removed <- true
}
}
}
}
// AddWatcher adds/enables a new watcher. The watcher will be fired up right
// away if the event manager is already running, otherwise it's scheduled to run
// when Run() is called.
func (m *Manager) AddWatcher(ctx context.Context, watcher Watcher) error {
m.watchersMutex.Lock()
defer m.watchersMutex.Unlock()
id := watcher.ID()
if _, found := m.watchersMap[id]; found {
return fmt.Errorf("watcher(%s) was previously added", id)
}
// Add the watchers and its events to internal mappings.
evTypes := make(map[string]*WatcherEventType)
m.watchersMap[id] = true
for _, curr := range watcher.Events() {
evType := &WatcherEventType{
watcher: watcher,
evType: curr,
removed: make(chan bool),
}
evTypes[curr] = evType
m.watcherEvents = append(m.watcherEvents, evType)
}
// If we are not running don't bother "running" the watcher, Run() will do it
// later.
if !m.running.Load() {
return nil
}
// If we are already running the "run/launch" the watcher.
for _, curr := range watcher.Events() {
galog.Debugf("Adding watcher for event: %s", curr)
m.queue.add(curr)
go m.runWatcher(ctx, watcher, curr, evTypes[curr].removed)
}
return nil
}
func (m *Manager) runWatcher(ctx context.Context, watcher Watcher, evType string, removed chan bool) {
var abort atomic.Bool
nCtx, cancel := context.WithCancel(ctx)
id := watcher.ID()
// Handle watcher removal, another go routine can call RemoveWatcher() and if
// that happens we'll be notified by the removed channel to cancel it.
go func() {
abort.Store(<-removed)
galog.Debugf("Got a request to abort watcher(%s) for event: %s", id, evType)
cancel()
}()
for renew := true; renew; {
var evData any
var err error
renew, evData, err = watcher.Run(nCtx, evType)
galog.Debugf("Watcher(%s) returned event: %q, should renew?: %t", id, evType, renew)
if abort.Load() || nCtx.Err() != nil {
galog.Debugf("Watcher(%s), either are aborting or leaving, breaking renew cycle", id)
break
}
m.queue.dataBus <- eventBusData{
evType: evType,
data: &EventData{
Data: evData,
Error: err,
},
}
}
galog.Debugf("watcher finishing: %s", evType)
if !abort.Load() {
removed <- true
}
// Notify the manager go routine that this watcher has finished so it can
// clean it up (and or finish the event manager if no watcher is left).
m.queue.watcherDone <- evType
}
// Run runs the event manager, it will block until all watchers have given
// up/failed. The event manager is meant to be started right after the early
// initialization code and live until the application ends, the event manager
// can not be restarted - the Run() method will return an error if one tries to
// run it twice.
func (m *Manager) Run(ctx context.Context) error {
var wg sync.WaitGroup
galog.Debugf("Starting event manager.")
if !m.running.CompareAndSwap(false, true) {
return fmt.Errorf("tried calling event manager's Run() twice")
}
runCtx, contextCancel := context.WithCancel(ctx)
// Creates a goroutine for each registered watcher's event and keep handling
// its execution until they give up/finishes their job by returning
// renew = false.
for _, curr := range m.watcherEvents {
m.queue.add(curr.evType)
go m.runWatcher(runCtx, curr.watcher, curr.evType, curr.removed)
}
// Manages the event processing avoiding blocking the watcher's go routines.
// This will listen to dataBus and call the events handlers/callbacks.
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-runCtx.Done():
return
case busData := <-m.queue.dataBus:
galog.Debugf("Got event: %s", busData.evType)
m.subscribersMutex.RLock()
subscribers := m.subscribers[busData.evType]
m.subscribersMutex.RUnlock()
if subscribers == nil {
galog.Debugf("No subscriber found for event: %s, returning.", busData.evType)
continue
}
deleteMe := make([]*EventSubscriber, 0)
for _, curr := range subscribers {
galog.Debugf("Running event subscribed callback: (event: %q, subscriber: %q)", busData.evType, curr.Name)
renew := (curr.Callback)(ctx, busData.evType, curr.Data, busData.data)
if !renew {
deleteMe = append(deleteMe, curr)
}
galog.Debugf("Returning from event subscribed callback: (event: %q, subscriber: %q, should renew?: %t)", busData.evType, curr.Name, renew)
}
for _, curr := range deleteMe {
m.Unsubscribe(busData.evType, curr.Name)
}
}
}
}()
// Controls the completion of the watcher go routines, their removal from the
// queue and signals to context & callback control go routines about watchers
// completion.
wg.Add(1)
go func() {
defer wg.Done()
for len := m.queue.length(); len > 0 && runCtx.Err() == nil; {
select {
case <-runCtx.Done():
return
case evType := <-m.queue.watcherDone:
len = m.queue.del(evType)
m.deleteRemovingEvent(evType)
if len == 0 {
galog.Debugf("All watchers are finished, signaling to leave.")
contextCancel()
return
}
}
}
}()
wg.Wait()
return nil
}