internal/controllers/synthesis/gc.go (141 lines of code) (raw):

package synthesis import ( "context" "fmt" "time" apiv1 "github.com/Azure/eno/api/v1" "github.com/Azure/eno/internal/manager" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) type podGarbageCollector struct { client client.Client creationTimeout time.Duration } func NewPodGC(mgr ctrl.Manager, creationTimeout time.Duration) error { c := &podGarbageCollector{ client: mgr.GetClient(), creationTimeout: creationTimeout, } return ctrl.NewControllerManagedBy(mgr). For(&corev1.Pod{}). WithLogConstructor(manager.NewLogConstructor(mgr, "podGarbageCollector")). Complete(c) } func (p *podGarbageCollector) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := logr.FromContextOrDiscard(ctx) pod := &corev1.Pod{} err := p.client.Get(ctx, req.NamespacedName, pod) if errors.IsNotFound(err) { return ctrl.Result{}, nil } if err != nil { logger.Error(err, "failed to get pod") return ctrl.Result{}, err } if pod.DeletionTimestamp != nil { return ctrl.Result{}, nil } logger = logger.WithValues("podName", pod.Name, "podNamespace", pod.Namespace) if pod.Labels == nil { logger.V(0).Info("saw a pod without any labels - this shouldn't be possible!") return ctrl.Result{}, nil } // Avoid waiting for the lease to expire for broken nodes if delta := timeWaitingForKubelet(pod, time.Now()); delta > 0 { if delta < p.creationTimeout { return ctrl.Result{RequeueAfter: p.creationTimeout - delta}, nil } logger = logger.WithValues("reason", "ContainerCreationTimeout") return ctrl.Result{}, p.deletePod(ctx, pod, logger) } // GC pods from deleted compositions comp := &apiv1.Composition{} comp.Name = pod.GetLabels()[compositionNameLabelKey] comp.Namespace = pod.GetLabels()[compositionNamespaceLabelKey] err = p.client.Get(ctx, client.ObjectKeyFromObject(comp), comp) logger = logger.WithValues("compositionName", comp.Name, "compositionNamespace", comp.Namespace, "compositionGeneration", comp.Generation, "synthesisAge", synthesisAge(comp)) if errors.IsNotFound(err) || comp.DeletionTimestamp != nil { logger = logger.WithValues("reason", "CompositionDeleted") return ctrl.Result{}, p.deletePod(ctx, pod, logger) } if err != nil { logger.Error(err, "failed to get composition resource") return ctrl.Result{}, err } // GC pods from missing synthesizers syn := &apiv1.Synthesizer{} syn.Name = comp.Spec.Synthesizer.Name err = p.client.Get(ctx, client.ObjectKeyFromObject(syn), syn) logger = logger.WithValues("synthesizerName", syn.Name, "synthesizerGeneration", syn.Generation) if errors.IsNotFound(err) { logger = logger.WithValues("reason", "SynthesizerDeleted") return ctrl.Result{}, p.deletePod(ctx, pod, logger) } if err != nil { logger.Error(err, "failed to get synthesizer") return ctrl.Result{}, err } // Ignore brand new pods since the pod/composition informer might not be in sync const gracePeriod = time.Second delta := gracePeriod - time.Since(pod.CreationTimestamp.Time) if delta > 0 { return ctrl.Result{RequeueAfter: delta}, nil } // The image tag must match the current synthesizer, otherwise other properties (e.g. refs) may be incorrect if img := findContainerImage(pod); img != "" && img != syn.Spec.Image { logger = logger.WithValues("reason", "ImageChanged") return ctrl.Result{}, p.deletePod(ctx, pod, logger) } if syn := comp.Status.InFlightSynthesis; syn != nil { if syn.Canceled != nil { logger = logger.WithValues("reason", "Timeout") return ctrl.Result{}, p.deletePod(ctx, pod, logger) } // A new synthesis has replaced the previous if syn.UUID != pod.Labels[synthesisIDLabelKey] { logger = logger.WithValues("reason", "Superseded") return ctrl.Result{}, p.deletePod(ctx, pod, logger) } return ctrl.Result{RequeueAfter: time.Second}, nil // still active } // In-flight synthesis being swapped to current == synthesis completed if syn := comp.Status.CurrentSynthesis; syn != nil && syn.UUID == pod.Labels[synthesisIDLabelKey] { logger = logger.WithValues("reason", "Success") return ctrl.Result{}, p.deletePod(ctx, pod, logger) } // This condition should only be able to happen when the composition has been deleted logger = logger.WithValues("reason", "Orphaned") return ctrl.Result{}, p.deletePod(ctx, pod, logger) } func (p *podGarbageCollector) deletePod(ctx context.Context, pod *corev1.Pod, logger logr.Logger) error { if len(pod.Status.ContainerStatuses) > 0 { logger = logger.WithValues("restarts", pod.Status.ContainerStatuses[0].RestartCount) } err := p.client.Delete(ctx, pod, &client.DeleteOptions{Preconditions: &metav1.Preconditions{UID: &pod.UID, ResourceVersion: &pod.ResourceVersion}}) if err != nil { return fmt.Errorf("deleting pod: %w", err) } logger.Info("deleted synthesizer pod", "latency", time.Since(pod.CreationTimestamp.Time).Milliseconds()) return nil } func timeWaitingForKubelet(pod *corev1.Pod, now time.Time) time.Duration { if len(pod.Status.ContainerStatuses) > 0 { return 0 } for _, cond := range pod.Status.Conditions { if cond.Type != corev1.PodScheduled { continue } if cond.Status == corev1.ConditionFalse { return 0 } scheduledTime := &cond.LastTransitionTime.Time return now.Sub(*scheduledTime) } return 0 } func synthesisAge(comp *apiv1.Composition) *int64 { syn := comp.Status.InFlightSynthesis if syn == nil || syn.Initialized == nil { return nil } return ptr.To(time.Since(syn.Initialized.Time).Milliseconds()) }