internal/execution/executor.go (231 lines of code) (raw):
package execution
import (
"context"
"fmt"
"time"
apiv1 "github.com/Azure/eno/api/v1"
"github.com/Azure/eno/internal/inputs"
"github.com/Azure/eno/internal/resource"
krmv1 "github.com/Azure/eno/pkg/krm/functions/api/v1"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// maxSliceJsonBytes is the max sum of a resource slice's manifests.
const maxSliceJsonBytes = 1024 * 512
type Executor struct {
Reader client.Reader
Writer client.Client
Handler SynthesizerHandle
}
func (e *Executor) Synthesize(ctx context.Context, env *Env) error {
logger := logr.FromContextOrDiscard(ctx)
comp := &apiv1.Composition{}
comp.Name = env.CompositionName
comp.Namespace = env.CompositionNamespace
err := e.Reader.Get(ctx, client.ObjectKeyFromObject(comp), comp)
if err != nil {
return fmt.Errorf("fetching composition: %w", err)
}
syn := &apiv1.Synthesizer{}
syn.Name = comp.Spec.Synthesizer.Name
err = e.Reader.Get(ctx, client.ObjectKeyFromObject(syn), syn)
if err != nil {
return fmt.Errorf("fetching synthesizer: %w", err)
}
if reason, skip := skipSynthesis(comp, syn, env); skip {
logger.V(0).Info("synthesis is no longer relevant - skipping", "reason", reason)
return nil
}
input, revs, err := e.buildPodInput(ctx, comp, syn)
if err != nil {
return fmt.Errorf("building synthesizer input: %w", err)
}
output, err := e.Handler(ctx, syn, input)
if err != nil {
return fmt.Errorf("executing synthesizer: %w", err)
}
resultErr := findResultError(output)
var sliceRefs []*apiv1.ResourceSliceRef
if resultErr == nil {
sliceRefs, err = e.writeSlices(ctx, comp, output)
if err != nil {
return err
}
}
err = e.updateComposition(ctx, env, comp, syn, sliceRefs, revs, output)
if err != nil {
return err
}
return resultErr
}
func (e *Executor) buildPodInput(ctx context.Context, comp *apiv1.Composition, syn *apiv1.Synthesizer) (*krmv1.ResourceList, []apiv1.InputRevisions, error) {
logger := logr.FromContextOrDiscard(ctx)
bindings := map[string]*apiv1.Binding{}
for _, b := range comp.Spec.Bindings {
b := b
bindings[b.Key] = &b
}
rl := &krmv1.ResourceList{
Kind: krmv1.ResourceListKind,
APIVersion: krmv1.SchemeGroupVersion.String(),
}
revs := []apiv1.InputRevisions{}
for _, r := range syn.Spec.Refs {
key := r.Key
// Get the resource
start := time.Now()
obj := &unstructured.Unstructured{}
obj.SetGroupVersionKind(schema.GroupVersionKind{Group: r.Resource.Group, Version: r.Resource.Version, Kind: r.Resource.Kind})
b, ok := bindings[key]
if ok {
obj.SetName(b.Resource.Name)
obj.SetNamespace(b.Resource.Namespace)
} else {
obj.SetName(r.Resource.Name)
obj.SetNamespace(r.Resource.Namespace)
}
err := e.Reader.Get(ctx, client.ObjectKeyFromObject(obj), obj)
if err != nil {
return nil, nil, fmt.Errorf("getting resource for ref %q: %w", key, err)
}
anno := obj.GetAnnotations()
if anno == nil {
anno = map[string]string{}
}
anno["eno.azure.io/input-key"] = key
obj.SetAnnotations(anno)
rl.Items = append(rl.Items, obj)
logger.V(0).Info("retrieved input", "key", key, "latency", time.Since(start).Abs().Milliseconds())
// Store the revision to be written to the synthesis status later
revs = append(revs, *resource.NewInputRevisions(obj, key))
}
return rl, revs, nil
}
func (e *Executor) writeSlices(ctx context.Context, comp *apiv1.Composition, rl *krmv1.ResourceList) ([]*apiv1.ResourceSliceRef, error) {
logger := logr.FromContextOrDiscard(ctx)
previous, err := e.fetchPreviousSlices(ctx, comp)
if err != nil {
return nil, err
}
slices, err := resource.Slice(comp, previous, rl.Items, maxSliceJsonBytes)
if err != nil {
return nil, err
}
sliceRefs := make([]*apiv1.ResourceSliceRef, len(slices))
for i, slice := range slices {
start := time.Now()
err = e.writeResourceSlice(ctx, slice)
if err != nil {
return nil, fmt.Errorf("creating resource slice %d: %w", i, err)
}
logger.V(0).Info("wrote resource slice", "resourceSliceName", slice.Name, "latency", time.Since(start).Milliseconds())
sliceRefs[i] = &apiv1.ResourceSliceRef{Name: slice.Name}
}
return sliceRefs, nil
}
// fetchPreviousSlices retrieves the previous slices from the composition's current synthesis status.
// This function runs before the updateComposition function, which will later swap the current synthesis
// to become the previous synthesis. Therefore, the resourceslice retrieved from the current synthesis is
// actually the "previous" resource slices after the update is complete.
func (e *Executor) fetchPreviousSlices(ctx context.Context, comp *apiv1.Composition) ([]*apiv1.ResourceSlice, error) {
if comp.Status.CurrentSynthesis == nil {
return nil, nil // nothing to fetch
}
logger := logr.FromContextOrDiscard(ctx)
slices := []*apiv1.ResourceSlice{}
for _, ref := range comp.Status.CurrentSynthesis.ResourceSlices {
slice := &apiv1.ResourceSlice{}
slice.Name = ref.Name
slice.Namespace = comp.Namespace
err := e.Reader.Get(ctx, client.ObjectKeyFromObject(slice), slice)
if errors.IsNotFound(err) {
logger.V(0).Info("resource slice referenced by composition was not found - skipping", "resourceSliceName", slice.Name)
continue
}
if err != nil {
return nil, fmt.Errorf("fetching current resource slice %q: %w", slice.Name, err)
}
slices = append(slices, slice)
}
return slices, nil
}
func (e *Executor) writeResourceSlice(ctx context.Context, slice *apiv1.ResourceSlice) error {
var bytes int
for _, res := range slice.Spec.Resources {
bytes += len(res.Manifest)
}
// We retry on request timeouts to avoid the overhead of re-synthesizing in cases where we're sometimes unable to reach apiserver
return retry.OnError(retry.DefaultRetry, errors.IsServerTimeout, func() error {
err := e.Writer.Create(ctx, slice)
if err != nil {
logr.FromContextOrDiscard(ctx).Error(err, "error while creating resource slice - will retry later")
return err
}
return nil
})
}
func (e *Executor) updateComposition(ctx context.Context, env *Env, oldComp *apiv1.Composition, syn *apiv1.Synthesizer, refs []*apiv1.ResourceSliceRef, revs []apiv1.InputRevisions, rl *krmv1.ResourceList) error {
logger := logr.FromContextOrDiscard(ctx)
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
comp := &apiv1.Composition{}
err := e.Reader.Get(ctx, client.ObjectKeyFromObject(oldComp), comp)
if err != nil {
return err
}
now := metav1.Now()
comp.Status.InFlightSynthesis.Synthesized = &now
comp.Status.InFlightSynthesis.ResourceSlices = refs
comp.Status.InFlightSynthesis.ObservedSynthesizerGeneration = syn.Generation
comp.Status.InFlightSynthesis.InputRevisions = revs
comp.Status.InFlightSynthesis.Results = nil
for _, result := range rl.Results {
comp.Status.InFlightSynthesis.Results = append(comp.Status.InFlightSynthesis.Results, apiv1.Result{
Message: result.Message,
Severity: result.Severity,
Tags: result.Tags,
})
}
if reason, skip := skipSynthesis(comp, syn, env); skip {
logger.V(0).Info("synthesis is no longer relevant - discarding its output", "reason", reason)
return nil
}
// Swap pending->current->previous syntheses
if findResultError(rl) == nil {
comp.Status.PreviousSynthesis = comp.Status.CurrentSynthesis
comp.Status.CurrentSynthesis = comp.Status.InFlightSynthesis
comp.Status.InFlightSynthesis = nil
}
err = e.Writer.Status().Update(ctx, comp)
if err != nil {
return err
}
logger.V(0).Info("composition status has been updated following successful synthesis")
return nil
})
}
func skipSynthesis(comp *apiv1.Composition, syn *apiv1.Synthesizer, env *Env) (string, bool) {
synthesis := comp.Status.InFlightSynthesis
if synthesis == nil {
return "MissingSynthesis", true
}
if synthesis.UUID != env.SynthesisUUID {
return "UUIDMismatch", true
}
if synthesis.Canceled != nil {
return "SynthesisCanceled", true
}
if inputs.OutOfLockstep(syn, synthesis.InputRevisions) {
return "InputsOutOfLockstep", true
}
if env.Image != "" && env.Image != syn.Spec.Image {
return "ImageMismatch", true
}
return "", false
}
func findResultError(rl *krmv1.ResourceList) error {
if rl == nil {
return nil
}
for _, res := range rl.Results {
if res.Severity == krmv1.ResultSeverityError {
return fmt.Errorf("result: %s", res.Message)
}
}
return nil
}