in pkg/controller/manager.go [83:173]
func (am *actionManager) Run(ctx context.Context) error {
am.log.Debug("starting")
defer am.log.Debug("finished")
queuedIntents := make(chan *intent.Intent, maxQueuedIntents)
// TODO: split out accepted intent handler - it should handle its
// prioritization as needed to ensure that active nodes' events reach it.
for {
// Handle active intents
select {
case <-ctx.Done():
return nil
case qin, ok := <-queuedIntents:
log := am.log.WithFields(logfields.Intent(qin))
log.Debug("checking with policy")
// TODO: make policy checking and consideration richer
pview, err := am.makePolicyCheck(qin)
if err != nil {
log.WithError(err).Error("policy unenforceable")
continue
}
proceed, err := am.policy.Check(pview)
if err != nil {
log.WithError(err).Error("policy check errored")
continue
}
if !proceed {
log.Debug("policy denied intent")
continue
}
if !ok {
break
}
log.Debug("handling permitted intent")
am.takeAction(qin)
case input, ok := <-am.inputs:
if !ok {
am.log.Error("input channel closed")
break
}
queued := len(queuedIntents)
log := am.log.WithFields(logfields.Intent(input)).
WithFields(logrus.Fields{
"queue": "process",
"queue-length": fmt.Sprintf("%d", queued),
})
if queued < queueSkipThreshold {
queuedIntents <- input
continue
}
// Start dropping if its not possible to queue at all.
if queued+1 > maxQueuedIntents {
log.Warn("queue full, dropping intent this try")
continue
}
// TODO: handle backpressure better with rescheduling instead of drops
// Queue is getting full, let's be more selective about events that
// are propagated.
if isClusterActive(input) {
log.Info("queue active intent")
queuedIntents <- input
continue
}
if isLowPriority(input) {
n := randDropIntFunc(10)
willDrop := n%2 == 0
if willDrop {
// Intent is picked up again when cached Intent expires &
// Informer syncs OR if the Intent is changed (from update
// or otherwise by Node). This provides indirect
// backpressure by delaying the next time the Intent will be
// handled.
log.Warn("queue backlog high, randomly dropping intent")
continue
}
}
log.Debug("queue intent")
queuedIntents <- input
}
}
}