internal/controllers/scheduling/op.go (226 lines of code) (raw):
package scheduling
import (
"bytes"
"context"
"fmt"
"hash/fnv"
"reflect"
"time"
apiv1 "github.com/Azure/eno/api/v1"
"github.com/Azure/eno/internal/inputs"
"github.com/go-logr/logr"
"github.com/google/uuid"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
type op struct {
Synthesizer *apiv1.Synthesizer
Composition *apiv1.Composition
Reason opReason
NotBefore time.Time
Dispatched time.Time
id uuid.UUID // set when patch is built
synthRolloutHash []byte // memoized
}
func newOp(synth *apiv1.Synthesizer, comp *apiv1.Composition, nextCooldownSlot time.Time) *op {
o := &op{Synthesizer: synth, Composition: comp}
var ok bool
o.Reason, ok = classifyOp(synth, comp)
if !ok {
syn := comp.Status.InFlightSynthesis
if syn == nil || syn.Canceled == nil {
return nil
}
// Retry canceled syntheses with exponential backoff
backoff := time.Duration(float64(syn.Canceled.Sub(syn.Initialized.Time)) * float64(syn.Attempts) * 1.5)
o.Reason = retrySynthesisOp
o.NotBefore = syn.Canceled.Add(backoff)
return o
}
if o.Reason.Deferred() {
o.NotBefore = nextCooldownSlot
}
// Deferred ops have a special property: they won't replace an in-flight synthesis
// This protects frequent synth/input changes from effectively blocking synthesis
if o.Reason.Deferred() && comp.Synthesizing() {
return nil
}
return o
}
func classifyOp(synth *apiv1.Synthesizer, comp *apiv1.Composition) (opReason, bool) {
switch {
case comp.DeletionTimestamp != nil || !inputs.Exist(synth, comp) || inputs.OutOfLockstep(synth, comp.Status.InputRevisions) || !controllerutil.ContainsFinalizer(comp, "eno.azure.io/cleanup"):
return 0, false
case (comp.Status.CurrentSynthesis == nil || comp.Status.CurrentSynthesis.Synthesized == nil) && comp.Status.InFlightSynthesis == nil:
return initialSynthesisOp, true
case comp.ShouldForceResynthesis():
return forcedResynthesisOp, true
case compositionHasBeenModified(comp):
return compositionModifiedOp, true
case comp.ShouldIgnoreSideEffects():
return 0, false
}
syn := comp.Status.CurrentSynthesis
if comp.Status.InFlightSynthesis != nil {
syn = comp.Status.InFlightSynthesis
}
nonDeferredInputChanges, deferredInputChanges := inputChangeCount(synth, comp.Status.InputRevisions, syn.InputRevisions)
if nonDeferredInputChanges > 0 {
return inputModifiedOp, true
}
if deferredInputChanges > 0 {
return deferredInputModifiedOp, true
}
if syn.ObservedSynthesizerGeneration > 0 && syn.ObservedSynthesizerGeneration < synth.Generation {
return synthesizerModifiedOp, true
}
return 0, false
}
func compositionHasBeenModified(comp *apiv1.Composition) bool {
if comp.Status.InFlightSynthesis != nil {
return comp.Status.InFlightSynthesis.ObservedCompositionGeneration != comp.Generation
}
return comp.Status.CurrentSynthesis != nil && comp.Status.CurrentSynthesis.ObservedCompositionGeneration != comp.Generation
}
func (o *op) Less(than *op) bool {
if o.Reason == synthesizerModifiedOp && than.Reason == synthesizerModifiedOp {
cmp := bytes.Compare(o.SynthRolloutOrderHash(), than.SynthRolloutOrderHash())
if cmp != 0 {
return cmp > 0
}
}
if !o.NotBefore.IsZero() && than.NotBefore.IsZero() && o.NotBefore.Before(than.NotBefore) {
return true
}
if o.Reason == than.Reason {
return o.Composition.UID < than.Composition.UID
}
return o.Reason < than.Reason
}
// SynthRolloutOrderHash returns a hash that represents this composition's order in the rollout of a particular synthesizer generation.
// This mechanism maintains determinism while shuffling the rollout order of every synthesizer change.
func (o *op) SynthRolloutOrderHash() []byte {
if o.synthRolloutHash == nil {
hash := fnv.New64()
fmt.Fprintf(hash, "%s:%d:%s", o.Synthesizer.UID, o.Synthesizer.Generation, o.Composition.UID)
o.synthRolloutHash = hash.Sum(nil)
}
return o.synthRolloutHash
}
func (o *op) HasBeenPatched(ctx context.Context, cli client.Reader, grace time.Duration) (bool, time.Duration, error) {
logger := logr.FromContextOrDiscard(ctx)
wait := time.Since(o.Dispatched)
if wait > grace {
logger.V(1).Info("operation cache grace period expired", "synthesisUUID", o.id)
return true, wait, nil
}
comp := &apiv1.Composition{}
err := cli.Get(ctx, client.ObjectKeyFromObject(o.Composition), comp)
if err != nil {
return false, 0, err
}
if syn := comp.Status.CurrentSynthesis; syn != nil && syn.UUID == o.id.String() {
return true, wait, nil
}
if syn := comp.Status.InFlightSynthesis; syn != nil && syn.UUID == o.id.String() {
return true, wait, nil
}
return false, wait, nil
}
func (o *op) BuildPatch() any {
ops := []jsonPatch{}
if o.id == uuid.Nil {
// defer generating the uuid until we know the op is definitely going to be dispatched
o.id = uuid.Must(uuid.NewRandom())
}
// Initialize the status if it's nil (zero value struct on the client == nil on the server side)
if reflect.DeepEqual(o.Composition.Status, apiv1.CompositionStatus{}) {
ops = append(ops,
jsonPatch{Op: "test", Path: "/status", Value: nil},
jsonPatch{Op: "add", Path: "/status", Value: map[string]any{}})
}
// The input watch controller might have concurrently modified the input revisions
ops = append(ops, jsonPatch{Op: "test", Path: "/status/inputRevisions", Value: o.Composition.Status.InputRevisions})
// Protect against zombie leaders running this controller
if syn := o.Composition.Status.InFlightSynthesis; syn == nil {
ops = append(ops, jsonPatch{Op: "test", Path: "/status/inFlightSynthesis", Value: nil})
} else {
ops = append(ops,
jsonPatch{Op: "test", Path: "/status/inFlightSynthesis/uuid", Value: syn.UUID},
jsonPatch{Op: "test", Path: "/status/inFlightSynthesis/observedCompositionGeneration", Value: syn.ObservedCompositionGeneration},
jsonPatch{Op: "test", Path: "/status/inFlightSynthesis/synthesized", Value: syn.Synthesized})
}
var attempts int
if syn := o.Composition.Status.InFlightSynthesis; syn != nil && o.Reason == retrySynthesisOp {
attempts = syn.Attempts
}
ops = append(ops, jsonPatch{
Op: "replace",
Path: "/status/inFlightSynthesis",
Value: map[string]any{
"observedCompositionGeneration": o.Composition.Generation,
"initialized": time.Now().Format(time.RFC3339),
"uuid": o.id.String(),
"deferred": o.Reason.Deferred(),
"attempts": attempts + 1,
},
})
return ops
}
type jsonPatch struct {
Op string `json:"op"`
Path string `json:"path"`
Value any `json:"value"`
}
type opReason int
const (
initialSynthesisOp opReason = iota
forcedResynthesisOp
compositionModifiedOp
inputModifiedOp
deferredInputModifiedOp
synthesizerModifiedOp
retrySynthesisOp
)
var allReasons = []opReason{initialSynthesisOp, forcedResynthesisOp, compositionModifiedOp, inputModifiedOp, deferredInputModifiedOp, synthesizerModifiedOp}
func (r opReason) Deferred() bool { return r == deferredInputModifiedOp || r == synthesizerModifiedOp }
func (r opReason) String() string {
switch r {
case initialSynthesisOp:
return "InitialSynthesis"
case forcedResynthesisOp:
return "ForcedResynthesis"
case compositionModifiedOp:
return "CompositionModified"
case inputModifiedOp:
return "InputModified"
case deferredInputModifiedOp:
return "DeferredInputModified"
case synthesizerModifiedOp:
return "SynthesizerModified"
case retrySynthesisOp:
return "Retry"
default:
return "Unknown"
}
}
func inputChangeCount(synth *apiv1.Synthesizer, a, b []apiv1.InputRevisions) (nonDeferred, deferred int) {
refsByKey := map[string]apiv1.Ref{}
for _, ref := range synth.Spec.Refs {
ref := ref
refsByKey[ref.Key] = ref
}
bByKey := map[string]apiv1.InputRevisions{}
for _, br := range b {
bByKey[br.Key] = br
}
for _, ar := range a {
ref, exists := refsByKey[ar.Key]
if !exists {
continue
}
br, exists := bByKey[ar.Key]
if !exists {
continue
}
if br.Less(ar) {
if ref.Defer {
deferred++
} else {
nonDeferred++
}
}
}
return nonDeferred, deferred
}