internal/controllers/synthesis/lifecycle.go (117 lines of code) (raw):

package synthesis import ( "context" "fmt" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" apiv1 "github.com/Azure/eno/api/v1" "github.com/Azure/eno/internal/manager" ) type Config struct { ExecutorImage string PodNamespace string PodServiceAccount string TaintTolerationKey string TaintTolerationValue string NodeAffinityKey string NodeAffinityValue string } type podLifecycleController struct { config *Config client client.Client noCacheReader client.Reader } // NewPodLifecycleController is responsible for creating and deleting pods as needed to synthesize compositions. func NewPodLifecycleController(mgr ctrl.Manager, cfg *Config) error { c := &podLifecycleController{ config: cfg, client: mgr.GetClient(), noCacheReader: mgr.GetAPIReader(), } return ctrl.NewControllerManagedBy(mgr). For(&apiv1.Composition{}). WatchesRawSource(source.TypedKind[*corev1.Pod](mgr.GetCache(), &corev1.Pod{}, c.newPodEventHandler())). WithLogConstructor(manager.NewLogConstructor(mgr, "podLifecycleController")). Complete(c) } func (c *podLifecycleController) newPodEventHandler() handler.TypedEventHandler[*corev1.Pod, reconcile.Request] { return &handler.TypedFuncs[*corev1.Pod, reconcile.Request]{ CreateFunc: func(ctx context.Context, e event.TypedCreateEvent[*corev1.Pod], q workqueue.TypedRateLimitingInterface[reconcile.Request]) { }, UpdateFunc: func(ctx context.Context, e event.TypedUpdateEvent[*corev1.Pod], q workqueue.TypedRateLimitingInterface[reconcile.Request]) { if e.ObjectNew.DeletionTimestamp == nil || e.ObjectOld.DeletionTimestamp != nil || e.ObjectNew.Labels == nil { return } nsn := types.NamespacedName{ Name: e.ObjectNew.GetLabels()[compositionNameLabelKey], Namespace: e.ObjectNew.GetLabels()[compositionNamespaceLabelKey], } q.Add(reconcile.Request{NamespacedName: nsn}) }, DeleteFunc: func(ctx context.Context, e event.TypedDeleteEvent[*corev1.Pod], q workqueue.TypedRateLimitingInterface[reconcile.Request]) { if e.DeleteStateUnknown || e.Object.Labels == nil { return } nsn := types.NamespacedName{ Name: e.Object.GetLabels()[compositionNameLabelKey], Namespace: e.Object.GetLabels()[compositionNamespaceLabelKey], } q.Add(reconcile.Request{NamespacedName: nsn}) }, } } func (c *podLifecycleController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := logr.FromContextOrDiscard(ctx) comp := &apiv1.Composition{} err := c.client.Get(ctx, req.NamespacedName, comp) if err != nil { logger.Error(err, "failed to get composition resource") return ctrl.Result{}, client.IgnoreNotFound(err) } if comp.DeletionTimestamp != nil || !controllerutil.ContainsFinalizer(comp, "eno.azure.io/cleanup") || comp.Status.InFlightSynthesis == nil || comp.Status.InFlightSynthesis.Canceled != nil { return ctrl.Result{}, nil } logger = logger.WithValues("compositionName", comp.Name, "compositionNamespace", comp.Namespace, "compositionGeneration", comp.Generation, "synthesisUUID", comp.Status.InFlightSynthesis.UUID) syn := &apiv1.Synthesizer{} syn.Name = comp.Spec.Synthesizer.Name err = c.client.Get(ctx, client.ObjectKeyFromObject(syn), syn) if err != nil { logger.Error(err, "failed to get synthesizer") return ctrl.Result{}, client.IgnoreNotFound(err) } if syn != nil { logger = logger.WithValues("synthesizerName", syn.Name, "synthesizerGeneration", syn.Generation) } // Confirm that a pod doesn't already exist for this synthesis without trusting informers. // This protects against cases where synthesis has recently started and something causes // another tick of this loop before the pod write hits the informer. pods := &corev1.PodList{} err = c.noCacheReader.List(ctx, pods, client.InNamespace(c.config.PodNamespace), client.MatchingLabels{ synthesisIDLabelKey: comp.Status.InFlightSynthesis.UUID, }) if err != nil { return ctrl.Result{}, fmt.Errorf("checking for existing pod: %w", err) } for _, pod := range pods.Items { if pod.DeletionTimestamp == nil { logger.V(1).Info(fmt.Sprintf("refusing to create new synthesizer pod because the pod %q already exists and has not been deleted", pod.Name)) return ctrl.Result{}, nil } } // If we made it this far it's safe to create a pod pod := newPod(c.config, comp, syn) err = c.client.Create(ctx, pod) if err != nil { return ctrl.Result{}, fmt.Errorf("creating pod: %w", err) } logger.V(0).Info("created synthesizer pod", "podName", pod.Name) sytheses.Inc() return ctrl.Result{}, nil }