internal/controllers/scheduling/controller.go (166 lines of code) (raw):
package scheduling
import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"hash/fnv"
"sort"
"time"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
apiv1 "github.com/Azure/eno/api/v1"
"github.com/Azure/eno/internal/manager"
)
const (
synthEpochAnnotationKey = "eno.azure.io/global-synthesizer-epoch"
)
// controller is responsible for carefully dispatching synthesis operations.
//
// Dispatching synthesis consists of swapping any existing synthesis state to the previous slot,
// which signals to other controllers that a new synthesis is needed. This controller will swap
// the states when the resulting synthesis operation will not cause the cluster-wide concurrency
// limit to be exceeded.
//
// Synthesis is dispatched when the composition spec is modified or when the inputs or synthesizer
// have changed. Deferred inputs and synthesizer changes are subject to a cluster-wide "cooldown period"
// to hedge against bad changes.
//
// The implementation is completely deterministic i.e. given a set of compositions and synthesizers,
// it will always produce the same synthesis order, even if two controllers think they are the current
// leader AND one of them has a newer composition or synthesizer in its informer cache.
//
// Rollout order for synthesizer changes is unique to the generation of the synthesizer.
// Compositions will not receive the new synthesizer in the same order for every generation, but
// the same generation will always roll out in the same order.
type controller struct {
client client.Client
concurrencyLimit int
cooldownPeriod time.Duration
cacheGracePeriod time.Duration
watchdogThreshold time.Duration
lastApplied *op
}
func NewController(mgr ctrl.Manager, concurrencyLimit int, cooldown, watchdogThreshold time.Duration) error {
c := &controller{
client: mgr.GetClient(),
concurrencyLimit: concurrencyLimit,
cooldownPeriod: cooldown,
cacheGracePeriod: time.Second,
watchdogThreshold: watchdogThreshold,
}
// Non-leaders should report that all slots are available, not zero.
freeSynthesisSlots.Set(float64(concurrencyLimit))
return ctrl.NewControllerManagedBy(mgr).
Named("schedulingController").
Watches(&apiv1.Composition{}, manager.SingleEventHandler()).
Watches(&apiv1.Synthesizer{}, manager.SingleEventHandler()).
WithLogConstructor(manager.NewLogConstructor(mgr, "schedulingController")).
Complete(c)
}
func (c *controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := logr.FromContextOrDiscard(ctx)
start := time.Now()
defer func() {
schedulingLatency.Observe(time.Since(start).Seconds())
}()
// Avoid conflict errors by waiting until we see the last dispatched synthesis (or timeout)
if c.lastApplied != nil {
ok, wait, err := c.lastApplied.HasBeenPatched(ctx, c.client, c.cacheGracePeriod)
if err != nil {
logger.Error(err, "checking cache for previous op")
return ctrl.Result{}, err
}
if !ok {
logger.V(1).Info("waiting for cache to reflect previous operation")
return ctrl.Result{RequeueAfter: wait}, nil
}
c.lastApplied = nil
}
synths := &apiv1.SynthesizerList{}
err := c.client.List(ctx, synths)
if err != nil {
logger.Error(err, "failed to list synthesizers")
return ctrl.Result{}, err
}
synthsByName, synthEpoch := indexSynthesizers(synths.Items)
comps := &apiv1.CompositionList{}
err = c.client.List(ctx, comps)
if err != nil {
logger.Error(err, "failed to list compositions")
return ctrl.Result{}, err
}
nextSlot := c.getNextCooldownSlot(comps)
var inFlight int
var op *op
for _, comp := range comps.Items {
comp := comp
if comp.Synthesizing() {
inFlight++
}
if missedReconciliation(&comp, c.watchdogThreshold) {
stuckReconciling.WithLabelValues(comp.Spec.Synthesizer.Name).Inc()
}
synth, ok := synthsByName[comp.Spec.Synthesizer.Name]
if !ok {
continue
}
next := newOp(&synth, &comp, nextSlot)
if next != nil && (op == nil || next.Less(op)) {
op = next
}
}
freeSynthesisSlots.Set(float64(c.concurrencyLimit - inFlight))
if op == nil || inFlight >= c.concurrencyLimit {
return ctrl.Result{}, nil
}
if !op.NotBefore.IsZero() { // the next op isn't ready to be dispathced yet
if wait := time.Until(op.NotBefore); wait > 0 {
return ctrl.Result{RequeueAfter: wait}, nil
}
}
logger = logger.WithValues("compositionName", op.Composition.Name, "compositionNamespace", op.Composition.Namespace, "reason", op.Reason, "synthEpoch", synthEpoch)
// Maintain ordering across synth/composition informers by doing a 2PC on the composition
if op.Reason == synthesizerModifiedOp && setSynthEpochAnnotation(op.Composition, synthEpoch) {
if err := c.client.Update(ctx, op.Composition); err != nil {
logger.Error(err, "updating synthesizer epoch")
return ctrl.Result{}, err
}
logger.V(1).Info("updated global synthesizer epoch")
return ctrl.Result{}, nil
}
if err := c.dispatchOp(ctx, op); err != nil {
if errors.IsInvalid(err) {
logger.Error(err, "conflict while dispatching synthesis")
return ctrl.Result{}, err
}
logger.Error(err, "dispatching synthesis operation")
return ctrl.Result{}, err
}
op.Dispatched = time.Now()
c.lastApplied = op
logger.V(0).Info("dispatched synthesis", "synthesisUUID", op.id)
return ctrl.Result{}, nil
}
// getNextCooldownSlot returns the next time at which a deferred synthesis can be dispatched while honoring the configured cooldown period.
func (c *controller) getNextCooldownSlot(comps *apiv1.CompositionList) time.Time {
var next time.Time
for _, comp := range comps.Items {
for _, syn := range []*apiv1.Synthesis{comp.Status.InFlightSynthesis, comp.Status.CurrentSynthesis, comp.Status.PreviousSynthesis} {
if syn != nil && syn.Deferred && syn.Initialized != nil && syn.Initialized.Time.After(next) {
next = syn.Initialized.Time
}
}
}
return next.Add(c.cooldownPeriod)
}
func (c *controller) dispatchOp(ctx context.Context, op *op) error {
patch, err := json.Marshal(op.BuildPatch())
if err != nil {
return err
}
return c.client.Status().Patch(ctx, op.Composition.DeepCopy(), client.RawPatch(types.JSONPatchType, patch))
}
// indexSynthesizers returns an indexed representation of the synthesizers and has the side effect of
// resetting the stuckReconciling metric.
func indexSynthesizers(synths []apiv1.Synthesizer) (byName map[string]apiv1.Synthesizer, epoch string) {
sort.Slice(synths, func(i, j int) bool { return synths[i].Name < synths[j].Name })
byName = map[string]apiv1.Synthesizer{}
h := fnv.New64()
stuckReconciling.Reset()
for _, synth := range synths {
byName[synth.Name] = synth
fmt.Fprintf(h, "%s:%d", synth.UID, synth.Generation)
stuckReconciling.WithLabelValues(synth.Name).Set(0)
}
return byName, hex.EncodeToString(h.Sum(nil))
}
func setSynthEpochAnnotation(comp *apiv1.Composition, value string) bool {
anno := comp.GetAnnotations()
if anno == nil {
anno = map[string]string{}
}
ok := anno[synthEpochAnnotationKey] != value
anno[synthEpochAnnotationKey] = value
comp.SetAnnotations(anno)
return ok
}