func()

in google_guest_agent/events/events.go [370:477]


func (mngr *Manager) Run(ctx context.Context) error {
	var wg sync.WaitGroup

	mngr.runningMutex.Lock()
	if mngr.running {
		mngr.runningMutex.Unlock()
		return fmt.Errorf("tried calling event manager's Run() twice")
	}
	mngr.running = true
	mngr.runningMutex.Unlock()

	queue := mngr.queue

	// Manages the context's done signal, pass it down to the other go routines to
	// finish its job and leave. Additionally, if the remaining go routines are leaving
	// we get it handled via dataBus channel and drop this go routine as well.
	wg.Add(1)
	go func(done <-chan struct{}, finishContextHandler <-chan bool, finishCallbackHandler chan<- bool) {
		defer wg.Done()

		for {
			select {
			case <-done:
				logger.Debugf("Got context's Done() signal, leaving.")
				queue.leaving = true
				finishCallbackHandler <- true
				return
			case <-finishContextHandler:
				logger.Debugf("Got context handler finish signal, leaving.")
				queue.leaving = true
				return
			}
		}
	}(ctx.Done(), queue.finishContextHandler, queue.finishCallbackHandler)

	// Manages the event processing avoiding blocking the watcher's go routines.
	// This will listen to dataBus and call the events handlers/callbacks.
	wg.Add(1)
	go func(bus <-chan eventBusData, finishCallbackHandler <-chan bool) {
		defer wg.Done()

		for {
			select {
			case <-finishCallbackHandler:
				return
			case busData := <-bus:
				subscribers := mngr.subscribers[busData.evType]
				if subscribers == nil {
					logger.Debugf("No subscriber found for event: %s, returning.", busData.evType)
					continue
				}

				deleteMe := make([]*eventSubscriber, 0)
				for _, curr := range subscribers {
					logger.Debugf("Running registered callback for event: %s", busData.evType)
					renew := (*curr.cb)(ctx, busData.evType, curr.data, busData.data)
					if !renew {
						deleteMe = append(deleteMe, curr)
					}
					logger.Debugf("Returning from event %q subscribed callback, should renew?: %t", busData.evType, renew)
				}

				mngr.subscribersMutex.Lock()
				for _, curr := range deleteMe {
					mngr.unsubscribe(busData.evType, curr.cb)
				}
				leave := mngr.subscribers[busData.evType] == nil
				mngr.subscribersMutex.Unlock()

				// No more subscribers at all, we have nothing more left to do here.
				if leave {
					logger.Debugf("No subscribers left, leaving")
					break
				}
			}
		}
	}(queue.dataBus, queue.finishCallbackHandler)

	// Creates a goroutine for each registered watcher's event and keep handling its
	// execution until they give up/finishes their job by returning renew = false.
	for _, curr := range mngr.watcherEvents {
		queue.add(curr.evType)
		go func(watcher Watcher, evType string, removed chan bool) {
			mngr.runWatcher(ctx, watcher, evType, removed)
		}(curr.watcher, curr.evType, curr.removed)
	}

	// Controls the completion of the watcher go routines, their removal from the queue
	// and signals to context & callback control go routines about watchers completion.
	wg.Add(1)
	go func() {
		defer wg.Done()

		for len := queue.length(); len > 0; {
			doneStr := <-queue.watcherDone
			len = queue.del(doneStr)
			delete(mngr.removingWatcherEvents, doneStr)
			if !queue.leaving && len == 0 {
				logger.Debugf("All watchers are finished, signaling to leave.")
				queue.finishContextHandler <- true
				queue.finishCallbackHandler <- true
			}
		}
	}()

	wg.Wait()
	return nil
}