pkg/controllers/hub/internalserviceimport/controller.go (267 lines of code) (raw):

/* Copyright (c) Microsoft Corporation. Licensed under the MIT license. */ // Package internalserviceimport features the InternalServiceImport controller for importing an exported // service into a member cluster. package internalserviceimport import ( "context" "encoding/json" "fmt" "reflect" "time" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" fleetnetv1alpha1 "go.goms.io/fleet-networking/api/v1alpha1" "go.goms.io/fleet-networking/pkg/common/objectmeta" ) const ( internalSvcImportCleanupFinalizer = "networking.fleet.azure.com/internalsvcimport-cleanup" svcImportCleanupFinalizer = "networking.fleet.azure.com/serviceimport-cleanup" internalSvcImportSvcRefNamespacedNameFieldKey = ".spec.serviceImportReference.namespacedName" internalSvcImportRetryInterval = time.Second * 2 ) // Reconciler reconciles an InternalServiceImport object. type Reconciler struct { HubClient client.Client } //+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=internalserviceimports,verbs=get;list;watch //+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=serviceimports,verbs=get;list //+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=serviceimports/status,verbs=get;update;patch // Reconcile checks if a member cluster can import a Service from the hub cluster and fulfills the import. func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { internalSvcImportRef := klog.KRef(req.Namespace, req.Name) startTime := time.Now() klog.V(2).InfoS("Reconciliation starts", "internalServiceImport", internalSvcImportRef) defer func() { latency := time.Since(startTime).Milliseconds() klog.V(2).InfoS("Reconciliation ends", "internalServiceImport", internalSvcImportRef, "latency", latency) }() // Retrieve the InternalServiceImport object. internalSvcImport := &fleetnetv1alpha1.InternalServiceImport{} if err := r.HubClient.Get(ctx, req.NamespacedName, internalSvcImport); err != nil { // Skip the reconciliation if the InternalServiceImport does not exist. if errors.IsNotFound(err) { klog.V(4).InfoS("Ignoring NotFound internalServiceImport", "internalServiceImport", internalSvcImportRef) return ctrl.Result{}, nil } klog.ErrorS(err, "Failed to get internalServiceImport", "internalServiceImport", internalSvcImportRef) return ctrl.Result{}, err } // Check if the ServiceImport exists. svcNS := internalSvcImport.Spec.ServiceImportReference.Namespace svcName := internalSvcImport.Spec.ServiceImportReference.Name svcImportKey := types.NamespacedName{Namespace: svcNS, Name: svcName} svcImportRef := klog.KRef(svcNS, svcName) klog.V(2).InfoS("Check if the Service can be imported", "serviceImport", svcImportRef, "internalServiceImport", internalSvcImportRef) svcImport := &fleetnetv1alpha1.ServiceImport{} err := r.HubClient.Get(ctx, svcImportKey, svcImport) switch { case err != nil && errors.IsNotFound(err): // The ServiceImport does not exist, and the Service will not be imported. If a Service spec has been // added to InternalServiceImport status, it will be cleared; the InternalServiceImport cleanup finalizer will // be removed as well (if applicable). klog.V(2).InfoS("ServiceImport does not exist; spec of imported Service (if any) will be cleared", "serviceImport", svcImportRef, "internalServiceImport", internalSvcImportRef) return r.clearInternalServiceImportStatus(ctx, internalSvcImport) case err != nil: // An unexpected error occurred. klog.ErrorS(err, "Failed to get ServiceImport", "serviceImport", svcImportRef, "internalServiceImport", internalSvcImportRef) return ctrl.Result{}, err case svcImport.DeletionTimestamp == nil && len(svcImport.Status.Clusters) == 0: // The ServiceImport is being processed; requeue the InternalServiceImport for later processing. klog.V(2).InfoS("ServiceImport is being processed; requeue for later processing", "serviceImport", svcImportRef, "internalServiceImport", internalSvcImportRef) return ctrl.Result{RequeueAfter: internalSvcImportRetryInterval}, nil } // Withdraw Service import request if the InternalServiceImport has been marked for deletion, or if the // ServceImport has been marked for deletion. if internalSvcImport.DeletionTimestamp != nil || svcImport.DeletionTimestamp != nil { if controllerutil.ContainsFinalizer(internalSvcImport, internalSvcImportCleanupFinalizer) { klog.V(2).InfoS("InternalServiceImport is deleted; withdraw the Service import request", "internalServiceImport", internalSvcImportRef) return r.withdrawServiceImport(ctx, svcImport, internalSvcImport) } // The absence of the InternalServiceImport cleanup finalizer guarantees that no attempt has been made to // import the Service to the member cluster, and as a result, no action is needed on this controller's end. return ctrl.Result{}, nil } // The cluster namespace and ID of the member cluster which attempts to import the Service. clusterNamespace := fleetnetv1alpha1.ClusterNamespace(internalSvcImport.Namespace) clusterID := fleetnetv1alpha1.ClusterID(internalSvcImport.Spec.ServiceImportReference.ClusterID) // Find out which member clusters have imported the Service. svcInUseBy := extractServiceInUseByInfoFromServiceImport(svcImport) if len(svcInUseBy.MemberClusters) > 0 { if _, ok := svcInUseBy.MemberClusters[clusterNamespace]; ok { // The current member cluster has already imported Service; fulfill the import (i.e. update the Service // spec kept in InternalServiceImport status). klog.V(2).InfoS("The member cluster has imported the Service; will sync the imported Service spec", "serviceImport", svcImportRef, "internalServiceImport", internalSvcImportRef) if err := r.fulfillInternalServiceImport(ctx, svcImport, internalSvcImport); err != nil { klog.ErrorS(err, "Failed to fulfill service import by updating InternalServiceImport status", "serviceImport", svcImportRef, "internalServiceImport", internalSvcImportRef) return ctrl.Result{}, err } return ctrl.Result{}, nil } // Another member cluster has already imported the Service; at this moment, it is required that one Service // can only be imported exactly once across the whole fleet, and as a result, attempt to import the Service // by the current member cluster will be aborted. klog.V(2).InfoS("A member cluster has already imported the Service", "internalServiceImport", internalSvcImportRef, "serviceInUseBy", svcInUseBy) return r.clearInternalServiceImportStatus(ctx, internalSvcImport) } klog.V(2).InfoS("The Service can be imported; will sync the Service spec", "serviceImport", svcImportRef, "internalServiceImport", internalSvcImportRef) // Add cleanup finalizer to InternalServiceImport. This must happen before an attempt to import a Service // is fulfilled. if err := r.addInternalServiceImportCleanupFinalizer(ctx, internalSvcImport); err != nil { klog.ErrorS(err, "Failed to add cleanup finalizer to InternalServiceImport", "internalServiceImport", internalSvcImportRef) return ctrl.Result{}, err } // Update the ServiceInUseBy annotation, which claims the Service for the current member cluster to import. svcInUseBy.MemberClusters[clusterNamespace] = clusterID if err := r.annotateServiceImportWithServiceInUseByInfo(ctx, svcImport, svcInUseBy); err != nil { klog.ErrorS(err, "Failed to annotate ServiceImport with ServiceInUseBy info", "serviceImport", svcImportRef, "serviceInUseBy", svcInUseBy) return ctrl.Result{}, err } // Fulfill the import (i.e. update the Service spec kept in InternalServiceImport status). if err := r.fulfillInternalServiceImport(ctx, svcImport, internalSvcImport); err != nil { klog.ErrorS(err, "Failed to fulfill service import by updating InternalServiceImport status", "serviceImport", svcImportRef, "internalServiceImport", internalSvcImportRef) return ctrl.Result{}, err } return ctrl.Result{}, nil } // SetupWithManager sets up the InternalServiceImport controller with a controller manager. func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error { // Set up an index for efficient InternalServiceImport lookup. internalSvcImportIndexerFunc := func(o client.Object) []string { internalSvcImport, ok := o.(*fleetnetv1alpha1.InternalServiceImport) if !ok { return []string{} } return []string{internalSvcImport.Spec.ServiceImportReference.NamespacedName} } if err := mgr.GetFieldIndexer().IndexField(ctx, &fleetnetv1alpha1.InternalServiceImport{}, internalSvcImportSvcRefNamespacedNameFieldKey, internalSvcImportIndexerFunc, ); err != nil { klog.ErrorS(err, "Failed to set up InternalServiceImport index") return err } // Enqueue InternalServiceImports for processing when a ServiceImport changes. eventHandlers := handler.EnqueueRequestsFromMapFunc(func(_ context.Context, o client.Object) []reconcile.Request { svcImport, ok := o.(*fleetnetv1alpha1.ServiceImport) if !ok { return []reconcile.Request{} } internalSvcImportList := &fleetnetv1alpha1.InternalServiceImportList{} fieldMatcher := client.MatchingFields{ internalSvcImportSvcRefNamespacedNameFieldKey: fmt.Sprintf("%s/%s", svcImport.Namespace, svcImport.Name), } if err := r.HubClient.List(ctx, internalSvcImportList, fieldMatcher); err != nil { klog.ErrorS(err, "Failed to list InternalServiceImports for an ServiceImport", "serviceImport", klog.KObj(svcImport)) return []reconcile.Request{} } reqs := make([]reconcile.Request, 0, len(internalSvcImportList.Items)) for _, internalSvcImport := range internalSvcImportList.Items { reqs = append(reqs, reconcile.Request{ NamespacedName: types.NamespacedName{ Namespace: internalSvcImport.Namespace, Name: internalSvcImport.Name, }, }) } return reqs }) return ctrl.NewControllerManagedBy(mgr). For(&fleetnetv1alpha1.InternalServiceImport{}). Watches(&fleetnetv1alpha1.ServiceImport{}, eventHandlers). Complete(r) } // withdrawServiceImport withdraws the request to import a Service to a member cluster. func (r *Reconciler) withdrawServiceImport(ctx context.Context, svcImport *fleetnetv1alpha1.ServiceImport, internalSvcImport *fleetnetv1alpha1.InternalServiceImport) (ctrl.Result, error) { // The cluster namespace of the member cluster which imports the Service. clusterNamespace := fleetnetv1alpha1.ClusterNamespace(internalSvcImport.Namespace) // Update the annotated ServiceInUseBy information. svcInUseBy := extractServiceInUseByInfoFromServiceImport(svcImport) if _, ok := svcInUseBy.MemberClusters[clusterNamespace]; ok { delete(svcInUseBy.MemberClusters, clusterNamespace) switch { case len(svcInUseBy.MemberClusters) > 0: // There are still member clusters importing the Service after the withdrawal; the ServiceInUseBy // annotation will be updated. Note that with current semantics (one import only across the fleet) // this branch should not run. if err := r.annotateServiceImportWithServiceInUseByInfo(ctx, svcImport, svcInUseBy); err != nil { klog.ErrorS(err, "Failed to annotate ServiceImport with ServiceInUseBy info", "serviceImport", klog.KObj(svcImport), "serviceInUseBy", svcInUseBy) return ctrl.Result{}, err } case len(svcInUseBy.MemberClusters) == 0: // No member cluster imports the Service after the withdrawal; the ServiceInUseBy annotation (and // the cleanup finalizer) on ServiceImport will be cleared. if err := r.clearServiceInUseByInfoFromServiceImport(ctx, svcImport); err != nil { klog.ErrorS(err, "Failed to clear ServiceImport ServiceInUseBy annotation", "serviceImport", klog.KObj(svcImport), "serviceInUseBy", svcInUseBy) return ctrl.Result{}, err } } } // A rare occurrence as it is, it could happen that the InternalServiceImport has the cleanup finalizer, // yet the import is not annotated on the ServiceImport. This is usually caused by data corruption, or // direct ServiceInUseBy annotation manipulation by the user; and in this case the controller will skip // the updating. // Remove the cleanup finalizer. if err := r.removeInternalServiceImportCleanupFinalizer(ctx, internalSvcImport); err != nil { klog.ErrorS(err, "Failed to remove cleanup finalizer from InternalServiceImport", "internalServiceImport", klog.KObj(internalSvcImport)) return ctrl.Result{}, err } return ctrl.Result{}, nil } // clearInternalServiceImportStatus clears the status (Service spec) from an InternalServiceImport; if the // InternalServiceImport has a cleanup finalizer added, it will be removed as well. func (r *Reconciler) clearInternalServiceImportStatus(ctx context.Context, internalSvcImport *fleetnetv1alpha1.InternalServiceImport) (ctrl.Result, error) { // Remove the cleanup finalizer from InternalServiceImport (if applicable). if err := r.removeInternalServiceImportCleanupFinalizer(ctx, internalSvcImport); err != nil { klog.ErrorS(err, "Failed to remove cleanup finalizer from InternalServiceImport", "internalServiceImport", klog.KObj(internalSvcImport)) return ctrl.Result{}, err } clearedInternalSvcImportStatus := fleetnetv1alpha1.ServiceImportStatus{} if reflect.DeepEqual(internalSvcImport.Status, clearedInternalSvcImportStatus) { // The state has stablized; skip the clearing. return ctrl.Result{}, nil } internalSvcImport.Status = clearedInternalSvcImportStatus if err := r.HubClient.Status().Update(ctx, internalSvcImport); err != nil { klog.ErrorS(err, "Failed to clear InternalServiceImport status", "internalServiceImport", klog.KObj(internalSvcImport)) return ctrl.Result{}, err } return ctrl.Result{}, nil } // removeInternalServiceImportCleanupFinalizer removes the cleanup finalizer from an InternalServiceImport. func (r *Reconciler) removeInternalServiceImportCleanupFinalizer(ctx context.Context, internalSvcImport *fleetnetv1alpha1.InternalServiceImport) error { if controllerutil.ContainsFinalizer(internalSvcImport, internalSvcImportCleanupFinalizer) { controllerutil.RemoveFinalizer(internalSvcImport, internalSvcImportCleanupFinalizer) return r.HubClient.Update(ctx, internalSvcImport) } return nil } // addInternalServiceImportCleanupFinalizer adds the cleanup finalizer to an InternalServiceImport. func (r *Reconciler) addInternalServiceImportCleanupFinalizer(ctx context.Context, internalSvcImport *fleetnetv1alpha1.InternalServiceImport) error { if !controllerutil.ContainsFinalizer(internalSvcImport, internalSvcImportCleanupFinalizer) { controllerutil.AddFinalizer(internalSvcImport, internalSvcImportCleanupFinalizer) return r.HubClient.Update(ctx, internalSvcImport) } return nil } // annotateServiceImportWithServiceInUseByInfo annotates ServiceInUseBy information on a ServiceImport. func (r *Reconciler) annotateServiceImportWithServiceInUseByInfo(ctx context.Context, svcImport *fleetnetv1alpha1.ServiceImport, svcInUseBy *fleetnetv1alpha1.ServiceInUseBy) error { if svcImport.Annotations == nil { // Initialize the annoation map if no annotations have been added before. svcImport.Annotations = map[string]string{} } data, err := json.Marshal(svcInUseBy) if err != nil { return err } svcImport.Annotations[objectmeta.ServiceImportAnnotationServiceInUseBy] = string(data) controllerutil.AddFinalizer(svcImport, svcImportCleanupFinalizer) return r.HubClient.Update(ctx, svcImport) } // clearServiceInUseByInfoFromServiceImport clears the ServiceInUseBy annotation from a ServiceImport, and its // cleanup finalizer. func (r *Reconciler) clearServiceInUseByInfoFromServiceImport(ctx context.Context, svcImport *fleetnetv1alpha1.ServiceImport) error { delete(svcImport.Annotations, objectmeta.ServiceImportAnnotationServiceInUseBy) controllerutil.RemoveFinalizer(svcImport, svcImportCleanupFinalizer) return r.HubClient.Update(ctx, svcImport) } // fulfillInternalServiceImport fulfills an import of a Service by syncing the Service spec to the status of an // InternalServiceImport. func (r *Reconciler) fulfillInternalServiceImport(ctx context.Context, svcImport *fleetnetv1alpha1.ServiceImport, internalSvcImport *fleetnetv1alpha1.InternalServiceImport) error { updatedInternalSvcImportStatus := svcImport.Status.DeepCopy() if reflect.DeepEqual(internalSvcImport.Status, updatedInternalSvcImportStatus) { // The state has stablized; skip the fulfillment. return nil } internalSvcImport.Status = *updatedInternalSvcImportStatus return r.HubClient.Status().Update(ctx, internalSvcImport) } // extractServiceInUseByInfoFromServiceImport extracts ServiceInUseBy information from annotations on a ServiceImport. func extractServiceInUseByInfoFromServiceImport(svcImport *fleetnetv1alpha1.ServiceImport) *fleetnetv1alpha1.ServiceInUseBy { data, ok := svcImport.ObjectMeta.Annotations[objectmeta.ServiceImportAnnotationServiceInUseBy] if !ok { // The ServiceInUseBy annotation is absent on ServiceImport. return &fleetnetv1alpha1.ServiceInUseBy{ MemberClusters: map[fleetnetv1alpha1.ClusterNamespace]fleetnetv1alpha1.ClusterID{}, } } svcInUseBy := &fleetnetv1alpha1.ServiceInUseBy{} if err := json.Unmarshal([]byte(data), svcInUseBy); err != nil { // The data cannot be unmarshalled; normally this should never happen, unless data corruption occurs, // or the annotations are manipulated directly by the user. The controller will have to overwrite // the data to fix the problem. // // Note that this situation, should it ever happens, can lead to an inconsistent state where one or more // member clusters believe that it has successfully imported a Service, yet the import itself is not // recognized by the fleet networking control plane (more specifically, the import is not documented // as ServiceImport annotations). Resync can eventually address this inconsistency, but it may take a long // while for the system to recover. klog.ErrorS(err, "Failed to unmarshal ServiceInUseBy data", "serviceImport", klog.KObj(svcImport), "data", data) return &fleetnetv1alpha1.ServiceInUseBy{ MemberClusters: map[fleetnetv1alpha1.ClusterNamespace]fleetnetv1alpha1.ClusterID{}, } } return svcInUseBy }