func()

in pkg/controller/manager.go [83:173]


func (am *actionManager) Run(ctx context.Context) error {
	am.log.Debug("starting")
	defer am.log.Debug("finished")

	queuedIntents := make(chan *intent.Intent, maxQueuedIntents)

	// TODO: split out accepted intent handler - it should handle its
	// prioritization as needed to ensure that active nodes' events reach it.

	for {
		// Handle active intents
		select {
		case <-ctx.Done():
			return nil

		case qin, ok := <-queuedIntents:
			log := am.log.WithFields(logfields.Intent(qin))
			log.Debug("checking with policy")
			// TODO: make policy checking and consideration richer
			pview, err := am.makePolicyCheck(qin)
			if err != nil {
				log.WithError(err).Error("policy unenforceable")
				continue
			}
			proceed, err := am.policy.Check(pview)
			if err != nil {
				log.WithError(err).Error("policy check errored")
				continue
			}
			if !proceed {
				log.Debug("policy denied intent")
				continue
			}
			if !ok {
				break
			}
			log.Debug("handling permitted intent")
			am.takeAction(qin)

		case input, ok := <-am.inputs:
			if !ok {
				am.log.Error("input channel closed")
				break
			}

			queued := len(queuedIntents)
			log := am.log.WithFields(logfields.Intent(input)).
				WithFields(logrus.Fields{
					"queue":        "process",
					"queue-length": fmt.Sprintf("%d", queued),
				})
			if queued < queueSkipThreshold {
				queuedIntents <- input
				continue
			}
			// Start dropping if its not possible to queue at all.
			if queued+1 > maxQueuedIntents {
				log.Warn("queue full, dropping intent this try")
				continue
			}

			// TODO: handle backpressure better with rescheduling instead of drops

			// Queue is getting full, let's be more selective about events that
			// are propagated.

			if isClusterActive(input) {
				log.Info("queue active intent")
				queuedIntents <- input
				continue
			}

			if isLowPriority(input) {
				n := randDropIntFunc(10)
				willDrop := n%2 == 0
				if willDrop {
					// Intent is picked up again when cached Intent expires &
					// Informer syncs OR if the Intent is changed (from update
					// or otherwise by Node). This provides indirect
					// backpressure by delaying the next time the Intent will be
					// handled.
					log.Warn("queue backlog high, randomly dropping intent")
					continue
				}
			}
			log.Debug("queue intent")
			queuedIntents <- input

		}
	}
}