in google_guest_agent/events/events.go [370:477]
func (mngr *Manager) Run(ctx context.Context) error {
var wg sync.WaitGroup
mngr.runningMutex.Lock()
if mngr.running {
mngr.runningMutex.Unlock()
return fmt.Errorf("tried calling event manager's Run() twice")
}
mngr.running = true
mngr.runningMutex.Unlock()
queue := mngr.queue
// Manages the context's done signal, pass it down to the other go routines to
// finish its job and leave. Additionally, if the remaining go routines are leaving
// we get it handled via dataBus channel and drop this go routine as well.
wg.Add(1)
go func(done <-chan struct{}, finishContextHandler <-chan bool, finishCallbackHandler chan<- bool) {
defer wg.Done()
for {
select {
case <-done:
logger.Debugf("Got context's Done() signal, leaving.")
queue.leaving = true
finishCallbackHandler <- true
return
case <-finishContextHandler:
logger.Debugf("Got context handler finish signal, leaving.")
queue.leaving = true
return
}
}
}(ctx.Done(), queue.finishContextHandler, queue.finishCallbackHandler)
// Manages the event processing avoiding blocking the watcher's go routines.
// This will listen to dataBus and call the events handlers/callbacks.
wg.Add(1)
go func(bus <-chan eventBusData, finishCallbackHandler <-chan bool) {
defer wg.Done()
for {
select {
case <-finishCallbackHandler:
return
case busData := <-bus:
subscribers := mngr.subscribers[busData.evType]
if subscribers == nil {
logger.Debugf("No subscriber found for event: %s, returning.", busData.evType)
continue
}
deleteMe := make([]*eventSubscriber, 0)
for _, curr := range subscribers {
logger.Debugf("Running registered callback for event: %s", busData.evType)
renew := (*curr.cb)(ctx, busData.evType, curr.data, busData.data)
if !renew {
deleteMe = append(deleteMe, curr)
}
logger.Debugf("Returning from event %q subscribed callback, should renew?: %t", busData.evType, renew)
}
mngr.subscribersMutex.Lock()
for _, curr := range deleteMe {
mngr.unsubscribe(busData.evType, curr.cb)
}
leave := mngr.subscribers[busData.evType] == nil
mngr.subscribersMutex.Unlock()
// No more subscribers at all, we have nothing more left to do here.
if leave {
logger.Debugf("No subscribers left, leaving")
break
}
}
}
}(queue.dataBus, queue.finishCallbackHandler)
// Creates a goroutine for each registered watcher's event and keep handling its
// execution until they give up/finishes their job by returning renew = false.
for _, curr := range mngr.watcherEvents {
queue.add(curr.evType)
go func(watcher Watcher, evType string, removed chan bool) {
mngr.runWatcher(ctx, watcher, evType, removed)
}(curr.watcher, curr.evType, curr.removed)
}
// Controls the completion of the watcher go routines, their removal from the queue
// and signals to context & callback control go routines about watchers completion.
wg.Add(1)
go func() {
defer wg.Done()
for len := queue.length(); len > 0; {
doneStr := <-queue.watcherDone
len = queue.del(doneStr)
delete(mngr.removingWatcherEvents, doneStr)
if !queue.leaving && len == 0 {
logger.Debugf("All watchers are finished, signaling to leave.")
queue.finishContextHandler <- true
queue.finishCallbackHandler <- true
}
}
}()
wg.Wait()
return nil
}