func()

in internal/events/events.go [380:460]


func (m *Manager) Run(ctx context.Context) error {
	var wg sync.WaitGroup

	galog.Debugf("Starting event manager.")
	if !m.running.CompareAndSwap(false, true) {
		return fmt.Errorf("tried calling event manager's Run() twice")
	}

	runCtx, contextCancel := context.WithCancel(ctx)

	// Creates a goroutine for each registered watcher's event and keep handling
	// its execution until they give up/finishes their job by returning
	// renew = false.
	for _, curr := range m.watcherEvents {
		m.queue.add(curr.evType)
		go m.runWatcher(runCtx, curr.watcher, curr.evType, curr.removed)
	}

	// Manages the event processing avoiding blocking the watcher's go routines.
	// This will listen to dataBus and call the events handlers/callbacks.
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			select {
			case <-runCtx.Done():
				return
			case busData := <-m.queue.dataBus:
				galog.Debugf("Got event: %s", busData.evType)

				m.subscribersMutex.RLock()
				subscribers := m.subscribers[busData.evType]
				m.subscribersMutex.RUnlock()

				if subscribers == nil {
					galog.Debugf("No subscriber found for event: %s, returning.", busData.evType)
					continue
				}

				deleteMe := make([]*EventSubscriber, 0)
				for _, curr := range subscribers {
					galog.Debugf("Running event subscribed callback: (event: %q, subscriber: %q)", busData.evType, curr.Name)
					renew := (curr.Callback)(ctx, busData.evType, curr.Data, busData.data)
					if !renew {
						deleteMe = append(deleteMe, curr)
					}
					galog.Debugf("Returning from event subscribed callback: (event: %q, subscriber: %q, should renew?: %t)", busData.evType, curr.Name, renew)
				}

				for _, curr := range deleteMe {
					m.Unsubscribe(busData.evType, curr.Name)
				}
			}
		}
	}()

	// Controls the completion of the watcher go routines, their removal from the
	// queue and signals to context & callback control go routines about watchers
	// completion.
	wg.Add(1)
	go func() {
		defer wg.Done()
		for len := m.queue.length(); len > 0 && runCtx.Err() == nil; {
			select {
			case <-runCtx.Done():
				return
			case evType := <-m.queue.watcherDone:
				len = m.queue.del(evType)
				m.deleteRemovingEvent(evType)
				if len == 0 {
					galog.Debugf("All watchers are finished, signaling to leave.")
					contextCancel()
					return
				}
			}
		}
	}()

	wg.Wait()
	return nil
}