in internal/events/events.go [380:460]
func (m *Manager) Run(ctx context.Context) error {
var wg sync.WaitGroup
galog.Debugf("Starting event manager.")
if !m.running.CompareAndSwap(false, true) {
return fmt.Errorf("tried calling event manager's Run() twice")
}
runCtx, contextCancel := context.WithCancel(ctx)
// Creates a goroutine for each registered watcher's event and keep handling
// its execution until they give up/finishes their job by returning
// renew = false.
for _, curr := range m.watcherEvents {
m.queue.add(curr.evType)
go m.runWatcher(runCtx, curr.watcher, curr.evType, curr.removed)
}
// Manages the event processing avoiding blocking the watcher's go routines.
// This will listen to dataBus and call the events handlers/callbacks.
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-runCtx.Done():
return
case busData := <-m.queue.dataBus:
galog.Debugf("Got event: %s", busData.evType)
m.subscribersMutex.RLock()
subscribers := m.subscribers[busData.evType]
m.subscribersMutex.RUnlock()
if subscribers == nil {
galog.Debugf("No subscriber found for event: %s, returning.", busData.evType)
continue
}
deleteMe := make([]*EventSubscriber, 0)
for _, curr := range subscribers {
galog.Debugf("Running event subscribed callback: (event: %q, subscriber: %q)", busData.evType, curr.Name)
renew := (curr.Callback)(ctx, busData.evType, curr.Data, busData.data)
if !renew {
deleteMe = append(deleteMe, curr)
}
galog.Debugf("Returning from event subscribed callback: (event: %q, subscriber: %q, should renew?: %t)", busData.evType, curr.Name, renew)
}
for _, curr := range deleteMe {
m.Unsubscribe(busData.evType, curr.Name)
}
}
}
}()
// Controls the completion of the watcher go routines, their removal from the
// queue and signals to context & callback control go routines about watchers
// completion.
wg.Add(1)
go func() {
defer wg.Done()
for len := m.queue.length(); len > 0 && runCtx.Err() == nil; {
select {
case <-runCtx.Done():
return
case evType := <-m.queue.watcherDone:
len = m.queue.del(evType)
m.deleteRemovingEvent(evType)
if len == 0 {
galog.Debugf("All watchers are finished, signaling to leave.")
contextCancel()
return
}
}
}
}()
wg.Wait()
return nil
}