pkg/controllers/hub/endpointsliceexport/controller.go (286 lines of code) (raw):

/* Copyright (c) Microsoft Corporation. Licensed under the MIT license. */ // Package endpointsliceexport features the EndpointSliceExport controller running on the hub cluster, which // is responsible for distributing EndpointSlices exported from member clusters. package endpointsliceexport import ( "context" "encoding/json" "fmt" "time" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" fleetnetv1alpha1 "go.goms.io/fleet-networking/api/v1alpha1" "go.goms.io/fleet-networking/pkg/common/apiretry" "go.goms.io/fleet-networking/pkg/common/objectmeta" ) const ( endpointSliceExportCleanupFinalizer = "networking.fleet.azure.com/endpointsliceexport-cleanup" endpointSliceImportNameFieldKey = ".metadata.name" endpointSliceExportOwnerSvcNamespacedNameFieldKey = ".spec.ownerServiceReference.namespacedName" endpointSliceExportRetryInterval = time.Second * 5 ) var ( endpointSliceImportIndexerFunc = func(o client.Object) []string { endpointSliceImport, ok := o.(*fleetnetv1alpha1.EndpointSliceImport) if !ok { return []string{} } return []string{endpointSliceImport.ObjectMeta.Name} } endpointSliceExportIndexerFunc = func(o client.Object) []string { endpointSliceExport, ok := o.(*fleetnetv1alpha1.EndpointSliceExport) if !ok { return []string{} } return []string{endpointSliceExport.Spec.OwnerServiceReference.NamespacedName} } ) // Reconciler reconciles the distribution of EndpointSlices across the fleet. type Reconciler struct { HubClient client.Client } //+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=endpointsliceexports,verbs=get;list;watch;create;update;patch //+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=endpointsliceimports,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=serviceimports,verbs=get;list;watch //+kubebuilder:rbac:groups=discovery.k8s.io,resources=endpointslices,verbs=get;create;update;patch;delete;list;watch // Reconcile distributes an exported EndpointSlice (in the form of EndpointSliceExports) to whichever member // cluster that has imported the EndpointSlice's owner Service. func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { endpointSliceExportRef := klog.KRef(req.Namespace, req.Name) startTime := time.Now() klog.V(2).InfoS("Reconciliation starts", "endpointSliceExport", endpointSliceExportRef) defer func() { latency := time.Since(startTime).Milliseconds() klog.V(2).InfoS("Reconciliation ends", "endpointSliceExport", endpointSliceExportRef, "latency", latency) }() // Retrieve the EndpointSliceExport object. endpointSliceExport := &fleetnetv1alpha1.EndpointSliceExport{} if err := r.HubClient.Get(ctx, req.NamespacedName, endpointSliceExport); err != nil { // Skip the reconciliation if the EndpointSliceExport does not exist; this should only happen when the // EndpointSliceExport does not have a finalizer set and is deleted right before the controller gets a // chance to reconcile it. The absence of the finalizer guarantees that the EndpointSlice has never been // distributed across the fleet, thus no action is needed on this controller's side. if errors.IsNotFound(err) { klog.V(4).InfoS("Ignoring NotFound endpointSliceExport", "endpointSliceExport", endpointSliceExportRef) return ctrl.Result{}, nil } klog.ErrorS(err, "Failed to get endpointSliceExport", "endpointSliceExport", endpointSliceExportRef) return ctrl.Result{}, err } // Check if the EndpointSliceExport has been marked for deletion; withdraw EndpointSliceImports across // the fleet if the EndpointSlice has been distributed. if endpointSliceExport.DeletionTimestamp != nil { if controllerutil.ContainsFinalizer(endpointSliceExport, endpointSliceExportCleanupFinalizer) { // The presence of the EndpointSliceExport cleanup finalizer guarantees that an attempt has been made // to distribute the EndpointSlice. klog.V(2).InfoS("EndpointSliceExport deleted; withdraw distributed EndpointSlices", "endpointSliceExport", endpointSliceExportRef) if err := r.withdrawAllEndpointSliceImports(ctx, endpointSliceExport); err != nil { return ctrl.Result{}, err } return ctrl.Result{}, nil } return ctrl.Result{}, nil } // Inquire the corresponding ServiceImport to find out which member clusters the EndpointSlice should be // distributed to. ownerSvcNS := endpointSliceExport.Spec.OwnerServiceReference.Namespace ownerSvc := endpointSliceExport.Spec.OwnerServiceReference.Name svcImportKey := types.NamespacedName{Namespace: ownerSvcNS, Name: ownerSvc} svcImport := &fleetnetv1alpha1.ServiceImport{} svcImportRef := klog.KRef(ownerSvcNS, ownerSvc) klog.V(2).InfoS("Inquire ServceImport to find out which member clusters have requested the EndpointSlice", "serviceImport", svcImportRef, "endpointSliceExport", endpointSliceExportRef) err := r.HubClient.Get(ctx, svcImportKey, svcImport) switch { case err != nil && errors.IsNotFound(err): // The corresponding ServiceImport does not exist; normally this will never happen as an EndpointSlice can // only be exported after its owner Service has been successfully exported. It could be that the controller // observes some in-between state, such as a Service is deleted right after being exported successfully, // and the system does not get to withdraw exported EndpointSlices from the Service yet. The controller // will requeue the EndpointSliceExport and wait until the state stablizes. klog.V(2).InfoS("ServiceImport does not exist", "serviceImport", svcImportRef, "endpointSliceExport", endpointSliceExportRef) return ctrl.Result{RequeueAfter: endpointSliceExportRetryInterval}, nil case err != nil: // An unexpected error occurs. klog.ErrorS(err, "Failed to get ServiceImport", "serviceImport", svcImportRef, "endpointSliceExport", endpointSliceExportRef) return ctrl.Result{}, err case len(svcImport.Status.Clusters) == 0: // The corresponding ServiceImport exists but it is still being processed. This is also a case that // should not happen in normal situations. The controller could be, once again, observing some in-between // state. The EndpointSliceExport will be requeued and re-processed when the state stablizes. klog.V(2).InfoS("ServiceImport is being processed (no accepted exports yet)", "serviceImport", svcImportRef, "endpointSliceExport", endpointSliceExportRef) return ctrl.Result{RequeueAfter: endpointSliceExportRetryInterval}, nil } data, ok := svcImport.ObjectMeta.Annotations[objectmeta.ServiceImportAnnotationServiceInUseBy] if !ok { // No cluster has requested to import the EndpointSlice's owner service. // If the exported EndpointSlice has been distributed across the fleet before; withdraw the // EndpointSliceImports. klog.V(2).InfoS("No cluster has requested to import the Service; withdraw distributed EndpointSlices", "serviceImport", svcImportRef, "endpointSliceExport", endpointSliceExportRef) if err := r.withdrawAllEndpointSliceImports(ctx, endpointSliceExport); err != nil { return ctrl.Result{}, err } // There is no need to remove the local EndpointSlice copy in this situation (the copy might be in // use by load balancing solutions on the hub cluster). return ctrl.Result{}, nil } svcInUseBy := &fleetnetv1alpha1.ServiceInUseBy{} if err := json.Unmarshal([]byte(data), svcInUseBy); err != nil { klog.ErrorS(err, "Failed to unmarshal data for in-use Services from ServiceImport annotations", "serviceImport", svcImportRef, "endpointSliceExport", endpointSliceExportRef, "data", data) // This error cannot be recovered by retrying; a reconciliation will be triggered when the ServiceInUseBy // data is overwritten. return ctrl.Result{}, nil } // Distribute the EndpointSlices. // Add cleanup finalizer to the EndpointSliceExport; this must happen before EndpointSlice is distributed. if !controllerutil.ContainsFinalizer(endpointSliceExport, endpointSliceExportCleanupFinalizer) { if err := r.addEndpointSliceExportCleanupFinalizer(ctx, endpointSliceExport); err != nil { klog.ErrorS(err, "Failed to add cleanup finalizer to EndpointSliceExport", "endpointSliceExport", endpointSliceExportRef) return ctrl.Result{}, err } } // Scan for EndpointSlices to withdraw and EndpointSlices to create or update. klog.V(2).InfoS("Scan for EndpointSliceImports to withdraw and to create/update", "serviceInUseBy", svcInUseBy, "endpointSliceExport", endpointSliceExport) endpointSliceImportsToWithdraw, endpointSlicesImportsToCreateOrUpdate, err := r.scanForEndpointSliceImports(ctx, endpointSliceExport, svcInUseBy) if err != nil { return ctrl.Result{}, err } klog.V(4).InfoS("EndpointSliceImports to withdraw", "count", len(endpointSliceImportsToWithdraw)) klog.V(4).InfoS("EndpointSliceImports to create or update", "count", len(endpointSlicesImportsToCreateOrUpdate)) // Delete distributed EndpointSlices that are no longer needed. // // Note: At this moment, it is guaranteed that any Service can only be imported once across the fleet, consequently // len(endpointSliceImportsToDelete) is at most 1. However, this behavior is subject to change as fleet // networking evolves, and for future compatibility reasons, the function assumes that a Service might have been // imported to multiple clusters. for idx := range endpointSliceImportsToWithdraw { endpointSliceImport := endpointSliceImportsToWithdraw[idx] // Skip if the EndpointSliceImport has been marked for deletion. if endpointSliceImport.DeletionTimestamp != nil { continue } klog.V(4).InfoS("Withdraw endpointSlice", "endpointSliceImport", klog.KObj(endpointSliceImport), "endpointSliceExport", endpointSliceExportRef) if err := apiretry.Do(func() error { return r.HubClient.Delete(ctx, endpointSliceImport) }); err != nil && !errors.IsNotFound(err) { klog.ErrorS(err, "Failed to withdraw EndpointSlice", "endpointSliceImport", klog.KObj(endpointSliceImport), "endpointSliceExport", endpointSliceExportRef) return ctrl.Result{}, err } } // Create or update distributed EndpointSlices. // // Note: At this moment, it is guaranteed that any Service can only be imported once across the fleet, consequently // len(endpointSliceImportsToCreateOrUpdate) is at most 1. However, this behavior is subject to change as fleet // networking evolves, and for future compatibility reasons, the function assumes that a Service might have been // imported to multiple clusters. for idx := range endpointSlicesImportsToCreateOrUpdate { endpointSliceImport := endpointSlicesImportsToCreateOrUpdate[idx] klog.V(4).InfoS("Create/update endpointSliceImport", "endpointSliceImport", klog.KObj(endpointSliceImport), "endpointSliceExport", endpointSliceExportRef) var op controllerutil.OperationResult if err := apiretry.Do(func() error { var createOrUpdateErr error op, createOrUpdateErr = controllerutil.CreateOrUpdate(ctx, r.HubClient, endpointSliceImport, func() error { endpointSliceImport.Spec = *endpointSliceExport.Spec.DeepCopy() return nil }) return createOrUpdateErr }); err != nil { klog.ErrorS(err, "Failed to create or update EndpointSliceImport", "endpointSliceImport", klog.KObj(endpointSliceImport), "endpointSliceExport", endpointSliceExportRef, "op", op) return ctrl.Result{}, err } } return ctrl.Result{}, nil } // SetupWithManager sets up the EndpointSliceExport controller with a controller manager. func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error { // Set up an index for efficient EndpointSliceImport lookup. if err := mgr.GetFieldIndexer().IndexField(ctx, &fleetnetv1alpha1.EndpointSliceImport{}, endpointSliceImportNameFieldKey, endpointSliceImportIndexerFunc, ); err != nil { klog.ErrorS(err, "Failed to set up index for EndpointSliceImport") return err } // Set up an index for efficient EndpointSliceExport lookup. if err := mgr.GetFieldIndexer().IndexField(ctx, &fleetnetv1alpha1.EndpointSliceExport{}, endpointSliceExportOwnerSvcNamespacedNameFieldKey, endpointSliceExportIndexerFunc, ); err != nil { klog.ErrorS(err, "Failed to set up index for EndpointSliceExport") return err } // Enqueue EndpointSliceExports for processing when a ServiceImport changes. eventHandlers := handler.EnqueueRequestsFromMapFunc(func(_ context.Context, o client.Object) []reconcile.Request { svcImport, ok := o.(*fleetnetv1alpha1.ServiceImport) if !ok { return []reconcile.Request{} } endpointSliceExportList := &fleetnetv1alpha1.EndpointSliceExportList{} fieldMatcher := client.MatchingFields{ endpointSliceExportOwnerSvcNamespacedNameFieldKey: fmt.Sprintf("%s/%s", svcImport.Namespace, svcImport.Name), } if err := r.HubClient.List(ctx, endpointSliceExportList, fieldMatcher); err != nil { klog.ErrorS(err, "Failed to list EndpointSliceExports for an imported Service", "serviceImport", klog.KObj(svcImport)) return []reconcile.Request{} } reqs := make([]reconcile.Request, 0, len(endpointSliceExportList.Items)) for _, endpointSliceExport := range endpointSliceExportList.Items { reqs = append(reqs, reconcile.Request{ NamespacedName: types.NamespacedName{ Namespace: endpointSliceExport.Namespace, Name: endpointSliceExport.Name, }, }) } return reqs }) return ctrl.NewControllerManagedBy(mgr). For(&fleetnetv1alpha1.EndpointSliceExport{}). Watches(&fleetnetv1alpha1.ServiceImport{}, eventHandlers). Complete(r) } // withdrawEndpointSliceImports withdraws EndpointSliceImports distributed across the fleet. func (r *Reconciler) withdrawAllEndpointSliceImports(ctx context.Context, endpointSliceExport *fleetnetv1alpha1.EndpointSliceExport) error { // List all EndpointSlices distributed as EndpointSliceImports. endpointSliceImportList := &fleetnetv1alpha1.EndpointSliceImportList{} listOpts := client.MatchingFields{ endpointSliceImportNameFieldKey: endpointSliceExport.Name, } if err := r.HubClient.List(ctx, endpointSliceImportList, listOpts); err != nil { klog.ErrorS(err, "Failed to list EndpointSliceImports by a specific name", "endpointSliceImportName", endpointSliceExport.Name, "endpointSliceExport", klog.KObj(endpointSliceExport)) return err } // Withdraw EndpointSliceImports from member clusters. for idx := range endpointSliceImportList.Items { endpointSliceImport := endpointSliceImportList.Items[idx] if err := apiretry.Do(func() error { return r.HubClient.Delete(ctx, &endpointSliceImport) }); err != nil && !errors.IsNotFound(err) { klog.ErrorS(err, "Failed to withdraw EndpointSliceImport", "endpointSliceImport", klog.KObj(&endpointSliceImport), "endpointSliceExport", klog.KObj(endpointSliceExport)) return err } } // Remove the EndpointSliceExport cleanup finalizer. if err := r.removeEndpointSliceExportCleanupFinalizer(ctx, endpointSliceExport); err != nil { klog.ErrorS(err, "Failed to remove EndpointSliceImport cleanup finalizer", "endpointSliceExport", klog.KObj(endpointSliceExport)) return err } return nil } // removeEndpointSliceExportCleanupFinalizer removes the cleanup finalizer from an EndpointSliceExport. func (r *Reconciler) removeEndpointSliceExportCleanupFinalizer(ctx context.Context, endpointSliceExport *fleetnetv1alpha1.EndpointSliceExport) error { controllerutil.RemoveFinalizer(endpointSliceExport, endpointSliceExportCleanupFinalizer) return r.HubClient.Update(ctx, endpointSliceExport) } // addEndpointSliceExportCleanupFinalizer adds the cleanup finalizer to an EndpointSliceExport. func (r *Reconciler) addEndpointSliceExportCleanupFinalizer(ctx context.Context, endpointSliceExport *fleetnetv1alpha1.EndpointSliceExport) error { controllerutil.AddFinalizer(endpointSliceExport, endpointSliceExportCleanupFinalizer) return r.HubClient.Update(ctx, endpointSliceExport) } // scanForEndpointSliceImports lists all EndpointSliceImports across the fleet created from a specific // EndpointSliceExport, and matches them with the set of member clusters that have requested the EndpointSlice; // it returns // * a list of EndpointSliceImports to withdraw (as their member clusters no longer need them); and // * a list of EndpointSliceImports to create or update (as some member clusters have requested them). // // Note: At this moment, it is guaranteed that any Service can only be imported once across the fleet, consequently // len(svcInUseBy.MemberClusters) should always be 1. However, this behavior is subject to change as fleet // networking evolves, and for future compatibility reasons, the function assumes that a Service might have been // imported to multiple clusters. func (r *Reconciler) scanForEndpointSliceImports( ctx context.Context, endpointSliceExport *fleetnetv1alpha1.EndpointSliceExport, svcInUseBy *fleetnetv1alpha1.ServiceInUseBy, ) (endpointSliceImportsToWithdraw, endpointSliceImportsToCreateOrUpdate []*fleetnetv1alpha1.EndpointSliceImport, err error) { // List all EndpointSlices distributed as EndpointSliceImports. endpointSliceImportList := &fleetnetv1alpha1.EndpointSliceImportList{} listOpts := client.MatchingFields{ endpointSliceImportNameFieldKey: endpointSliceExport.Name, } if err := r.HubClient.List(ctx, endpointSliceImportList, listOpts); err != nil { klog.ErrorS(err, "Failed to list EndpointSliceImports by a specific name", "endpointSliceImportName", endpointSliceExport.Name, "endpointSliceExport", klog.KObj(endpointSliceExport)) return endpointSliceImportsToWithdraw, endpointSliceImportsToCreateOrUpdate, err } // Match the EndpointSliceImports with the member clusters that have requested the EndpointSlice. for idx := range endpointSliceImportList.Items { endpointSliceImport := endpointSliceImportList.Items[idx] nsKey := fleetnetv1alpha1.ClusterNamespace(endpointSliceImport.Namespace) if _, ok := svcInUseBy.MemberClusters[nsKey]; ok { // A member cluster has requested the EndpointSlice and an EndpointSlice has been distributed to the // cluster; the EndpointSliceImport should be updated. endpointSliceImportsToCreateOrUpdate = append(endpointSliceImportsToCreateOrUpdate, &endpointSliceImport) delete(svcInUseBy.MemberClusters, nsKey) } else { // No member cluster has imported the EndpointSlice yet an EndpointSlice has been distributed to the cluster; // the EndpointSliceImport should be withdrawn. endpointSliceImportsToWithdraw = append(endpointSliceImportsToWithdraw, &endpointSliceImport) } } // A member cluster has requested the EndpointSlice but no EndpointSlice has been distributed to the cluster; // an EndpointSliceImport should be created. for ns := range svcInUseBy.MemberClusters { endpointSliceImport := &fleetnetv1alpha1.EndpointSliceImport{ ObjectMeta: metav1.ObjectMeta{ Namespace: string(ns), Name: endpointSliceExport.Name, }, } endpointSliceImportsToCreateOrUpdate = append(endpointSliceImportsToCreateOrUpdate, endpointSliceImport) } return endpointSliceImportsToWithdraw, endpointSliceImportsToCreateOrUpdate, nil }