google_guest_agent/events/events.go (303 lines of code) (raw):
// Copyright 2023 Google LLC
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// https://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package events is a events processing layer.
package events
import (
"context"
"fmt"
"sync"
"github.com/GoogleCloudPlatform/guest-agent/google_guest_agent/events/metadata"
"github.com/GoogleCloudPlatform/guest-logging-go/logger"
)
var (
defaultWatchers = []Watcher{
metadata.New(),
}
instance *Manager
)
// Watcher defines the interface between the events manager and the actual
// watcher implementation.
type Watcher interface {
// ID returns the watcher id.
ID() string
// Events return a slice with all the event types a given Watcher handles.
Events() []string
// Run implements the actuall "listening" strategy and emits a event "signal".
// It must return:
// - [bool] if the watcher should renew(run again).
// - [interface{}] a event context data pointer further describing the event(if needed).
// - [err] error case the Watcher failed and wants to notify subscribers(see EventData).
Run(ctx context.Context, evType string) (bool, interface{}, error)
}
// Manager defines the interface between events management layer and the
// core guest agent implementation.
type Manager struct {
// watcherEvents maps the registered watchers and their events.
watcherEvents []*WatcherEventType
// watchersMap is a convenient manager's mapping of registered watcher instances.
watchersMap map[string]bool
// watchersMutex protects the watchers map.
watchersMutex sync.Mutex
// removingWatcherEvents is a map of watchers being removed.
removingWatcherEvents map[string]bool
// running is a flag indicating if the Run() was previously called.
running bool
// runningMutex protects the running flag.
runningMutex sync.RWMutex
// subscribers maps the subscribed callbacks.
subscribers map[string][]*eventSubscriber
// subscribersMutex protects subscribers member/map of the manager object.
subscribersMutex sync.Mutex
// queue queue struct manages the running watchers, when it gets to len()
// down to zero means all watchers are done and we can signal the other
// control go routines to leave(given we don't have any more job left to
// process).
queue *watcherQueue
}
// watcherQueue wraps the watchers <-> callbacks communication as well as the
// communication/coordination of the multiple control go routine i.e. the one
// responsible to calling callbacks after a event is produced by the watcher etc.
type watcherQueue struct {
// queueMutex protects the access to watchersMap
queueMutex sync.RWMutex
// watchersMap maps the currently running watchers.
watchersMap map[string]bool
// finishContextHandler is a channel used to communicate with the context handling
// go routine that it should finish/end its job (usually after all watchers are done).
finishContextHandler chan bool
// finishCallbackHandler is a channel used to communicate with the callback handling
// go routine that it should finish/end its job (usually after all watchers are done).
finishCallbackHandler chan bool
// watcherDone is a channel used to communicate that a given watcher is finished/done.
watcherDone chan string
// dataBus is the channel used to communicate between watchers (event producer) and the
// callback handler (event consumer managing go routine).
dataBus chan eventBusData
// leaving is a flag that indicates no more job should be processed as we are done
// with all watchers and callbacks.
leaving bool
}
// EventData wraps the data communicated from a Watcher to a Subscriber.
type EventData struct {
// Data points to the Watcher provided data.
Data interface{}
// Error is used when a Watcher has failed and wants communicate its subscribers about the error.
Error error
}
// WatcherEventType wraps/couples together a Watcher and an event type.
type WatcherEventType struct {
// watcher is the watcher implementation for a given event type.
watcher Watcher
// evType idenfities the event type this object refences to.
evType string
// removed is a channel used to communicate with the running watcher go routine
// that it shouldn't renew even if the watcher requested a renew (in response of a
// RemoveWatcher() call.
removed chan bool
}
type eventSubscriber struct {
data interface{}
cb *EventCb
}
type eventBusData struct {
evType string
data *EventData
}
// EventCb defines the callback interface between watchers and subscribers. The arguments are:
// - ctx the app' context passed in from the manager's Run() call.
// - evType a string defining the what event type triggered the call.
// - data a user context pointer to be consumed by the callback.
// - evData a event specific data pointer.
//
// The callback should return true if it wants to renew, returning false will case the callback
// to be unregistered/unsubscribed.
type EventCb func(ctx context.Context, evType string, data interface{}, evData *EventData) bool
// length returns how many watchers are currently running.
func (ep *watcherQueue) length() int {
ep.queueMutex.RLock()
defer ep.queueMutex.RUnlock()
return len(ep.watchersMap)
}
// add adds a new watcher to the queue.
func (ep *watcherQueue) add(evType string) {
ep.queueMutex.Lock()
defer ep.queueMutex.Unlock()
ep.watchersMap[evType] = true
}
// del removes a watcher from the queue.
func (ep *watcherQueue) del(evType string) int {
ep.queueMutex.Lock()
defer ep.queueMutex.Unlock()
delete(ep.watchersMap, evType)
return len(ep.watchersMap)
}
// AddDefaultWatchers add the default watchers:
// - metadata
func (mngr *Manager) AddDefaultWatchers(ctx context.Context) error {
for _, curr := range defaultWatchers {
if err := mngr.AddWatcher(ctx, curr); err != nil {
return err
}
}
return nil
}
// newManager allocates and initializes a events Manager.
func newManager() *Manager {
return &Manager{
watchersMap: make(map[string]bool),
removingWatcherEvents: make(map[string]bool),
subscribers: make(map[string][]*eventSubscriber),
queue: &watcherQueue{
watchersMap: make(map[string]bool),
dataBus: make(chan eventBusData),
finishCallbackHandler: make(chan bool),
finishContextHandler: make(chan bool),
watcherDone: make(chan string),
},
}
}
func init() {
instance = newManager()
}
// Get allocates a new manager if one doesn't exists or returns the one previously allocated.
func Get() *Manager {
if instance == nil {
panic("The event's manager instance should had being initialized.")
}
return instance
}
// Subscribe registers an event consumer/subscriber callback to a given event type, data
// is a context pointer provided by the caller to be passed down when calling cb when
// a new event happens.
func (mngr *Manager) Subscribe(evType string, data interface{}, cb EventCb) {
mngr.subscribersMutex.Lock()
defer mngr.subscribersMutex.Unlock()
mngr.subscribers[evType] = append(mngr.subscribers[evType],
&eventSubscriber{
data: data,
cb: &cb,
},
)
}
func (mngr *Manager) unsubscribe(evType string, cb *EventCb) {
var keepMe []*eventSubscriber
for _, curr := range mngr.subscribers[evType] {
if curr.cb != cb {
keepMe = append(keepMe, curr)
}
}
mngr.subscribers[evType] = keepMe
if len(keepMe) == 0 {
logger.Debugf("No more subscribers left for evType: %s", evType)
delete(mngr.subscribers, evType)
}
}
// Unsubscribe removes the subscription of a given callback for a given event type.
func (mngr *Manager) Unsubscribe(evType string, cb EventCb) {
mngr.subscribersMutex.Lock()
defer mngr.subscribersMutex.Unlock()
mngr.unsubscribe(evType, &cb)
}
// RemoveWatcher removes a watcher from the event manager. Each running watcher has its own
// context (derived from the one provided in the AddWatcher() call) and will have it canceled
// after calling this method.
func (mngr *Manager) RemoveWatcher(ctx context.Context, watcher Watcher) error {
mngr.watchersMutex.Lock()
defer mngr.watchersMutex.Unlock()
id := watcher.ID()
logger.Debugf("Got a request to remove watcher: %s", id)
if _, found := mngr.watchersMap[id]; !found {
return fmt.Errorf("unknown Watcher(%s)", id)
}
for _, curr := range mngr.watcherEvents {
if _, found := mngr.removingWatcherEvents[curr.evType]; found {
logger.Debugf("Watcher(%s) is being removed, skipping removal request: %s", id, curr.evType)
continue
}
if curr.watcher.ID() == id {
mngr.removingWatcherEvents[curr.evType] = true
logger.Debugf("Removing watcher: %s, event type: %s", id, curr.evType)
curr.removed <- true
}
}
return nil
}
// AddWatcher adds/enables a new watcher. The watcher will be fired up right away if the
// event manager is already running, otherwise it's scheduled to run when Run() is called.
func (mngr *Manager) AddWatcher(ctx context.Context, watcher Watcher) error {
mngr.watchersMutex.Lock()
defer mngr.watchersMutex.Unlock()
id := watcher.ID()
if _, found := mngr.watchersMap[id]; found {
return fmt.Errorf("watcher(%s) was previously added", id)
}
// Add the watchers and its events to internal mappings.
evTypes := make(map[string]*WatcherEventType)
mngr.watchersMap[id] = true
for _, curr := range watcher.Events() {
evType := &WatcherEventType{
watcher: watcher,
evType: curr,
removed: make(chan bool),
}
evTypes[curr] = evType
mngr.watcherEvents = append(mngr.watcherEvents, evType)
}
mngr.runningMutex.RLock()
defer mngr.runningMutex.RUnlock()
// If we are not running don't bother "running" the watcher, Run() will do it later.
if !mngr.running {
return nil
}
// If we are already running the "run/launch" the watcher.
for _, curr := range watcher.Events() {
logger.Debugf("Adding watcher for event: %s", curr)
mngr.queue.add(curr)
go func(watcher Watcher, evType string, removed chan bool) {
mngr.runWatcher(ctx, watcher, evType, removed)
}(watcher, curr, evTypes[curr].removed)
}
return nil
}
func (mngr *Manager) runWatcher(ctx context.Context, watcher Watcher, evType string, removed chan bool) {
nCtx, cancel := context.WithCancel(ctx)
abort := false
id := watcher.ID()
go func() {
abort = <-removed
logger.Debugf("Got a request to abort watcher(%s) for event: %s", id, evType)
cancel()
}()
for renew := true; renew; {
var evData interface{}
var err error
renew, evData, err = watcher.Run(nCtx, evType)
logger.Debugf("Watcher(%s) returned event: %q, should renew?: %t", id, evType, renew)
if abort || mngr.queue.leaving {
logger.Debugf("Watcher(%s), either are aborting(%t) or leaving(%t), breaking renew cycle",
id, abort, mngr.queue.leaving)
break
}
mngr.queue.dataBus <- eventBusData{
evType: evType,
data: &EventData{
Data: evData,
Error: err,
},
}
}
logger.Debugf("watcher finishing: %s", evType)
if !abort {
removed <- true
}
mngr.queue.watcherDone <- evType
}
// Run runs the event manager, it will block until all watchers have given up/failed.
// The event manager is meant to be started right after the early initialization code
// and live until the application ends, the event manager can not be restarted - the Run()
// method will return an error if one tries to run it twice.
func (mngr *Manager) Run(ctx context.Context) error {
var wg sync.WaitGroup
mngr.runningMutex.Lock()
if mngr.running {
mngr.runningMutex.Unlock()
return fmt.Errorf("tried calling event manager's Run() twice")
}
mngr.running = true
mngr.runningMutex.Unlock()
queue := mngr.queue
// Manages the context's done signal, pass it down to the other go routines to
// finish its job and leave. Additionally, if the remaining go routines are leaving
// we get it handled via dataBus channel and drop this go routine as well.
wg.Add(1)
go func(done <-chan struct{}, finishContextHandler <-chan bool, finishCallbackHandler chan<- bool) {
defer wg.Done()
for {
select {
case <-done:
logger.Debugf("Got context's Done() signal, leaving.")
queue.leaving = true
finishCallbackHandler <- true
return
case <-finishContextHandler:
logger.Debugf("Got context handler finish signal, leaving.")
queue.leaving = true
return
}
}
}(ctx.Done(), queue.finishContextHandler, queue.finishCallbackHandler)
// Manages the event processing avoiding blocking the watcher's go routines.
// This will listen to dataBus and call the events handlers/callbacks.
wg.Add(1)
go func(bus <-chan eventBusData, finishCallbackHandler <-chan bool) {
defer wg.Done()
for {
select {
case <-finishCallbackHandler:
return
case busData := <-bus:
subscribers := mngr.subscribers[busData.evType]
if subscribers == nil {
logger.Debugf("No subscriber found for event: %s, returning.", busData.evType)
continue
}
deleteMe := make([]*eventSubscriber, 0)
for _, curr := range subscribers {
logger.Debugf("Running registered callback for event: %s", busData.evType)
renew := (*curr.cb)(ctx, busData.evType, curr.data, busData.data)
if !renew {
deleteMe = append(deleteMe, curr)
}
logger.Debugf("Returning from event %q subscribed callback, should renew?: %t", busData.evType, renew)
}
mngr.subscribersMutex.Lock()
for _, curr := range deleteMe {
mngr.unsubscribe(busData.evType, curr.cb)
}
leave := mngr.subscribers[busData.evType] == nil
mngr.subscribersMutex.Unlock()
// No more subscribers at all, we have nothing more left to do here.
if leave {
logger.Debugf("No subscribers left, leaving")
break
}
}
}
}(queue.dataBus, queue.finishCallbackHandler)
// Creates a goroutine for each registered watcher's event and keep handling its
// execution until they give up/finishes their job by returning renew = false.
for _, curr := range mngr.watcherEvents {
queue.add(curr.evType)
go func(watcher Watcher, evType string, removed chan bool) {
mngr.runWatcher(ctx, watcher, evType, removed)
}(curr.watcher, curr.evType, curr.removed)
}
// Controls the completion of the watcher go routines, their removal from the queue
// and signals to context & callback control go routines about watchers completion.
wg.Add(1)
go func() {
defer wg.Done()
for len := queue.length(); len > 0; {
doneStr := <-queue.watcherDone
len = queue.del(doneStr)
delete(mngr.removingWatcherEvents, doneStr)
if !queue.leaving && len == 0 {
logger.Debugf("All watchers are finished, signaling to leave.")
queue.finishContextHandler <- true
queue.finishCallbackHandler <- true
}
}
}()
wg.Wait()
return nil
}