pkg/controllers/hub/internalserviceexport/controller.go (188 lines of code) (raw):

/* Copyright (c) Microsoft Corporation. Licensed under the MIT license. */ // Package internalserviceexport features the InternalServiceExport controller for exporting services from member to // the fleet. package internalserviceexport import ( "context" "time" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" fleetnetv1alpha1 "go.goms.io/fleet-networking/api/v1alpha1" "go.goms.io/fleet-networking/pkg/common/condition" "go.goms.io/fleet-networking/pkg/common/objectmeta" ) // Reconciler reconciles a InternalServiceExport object. type Reconciler struct { client.Client // RetryInternal is the wait time for the controller to requeue the request and to wait for the // ServiceImport controller to resolve the service Spec. RetryInternal time.Duration } //+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=internalserviceexports,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=internalserviceexports/status,verbs=get;update;patch //+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=internalserviceexports/finalizers,verbs=update //+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=serviceimports,verbs=get;list;watch;create;update;patch //+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=serviceimports/status,verbs=get;update;patch // Reconcile creates/updates ServiceImport by watching internalServiceExport objects. // To simplify the design and implementation in the first phase, the serviceExport will be marked as conflicted if its // service spec does not match with serviceImport. // We may support KEP1645 Constraints and Conflict Resolution in the future. // https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#constraints-and-conflict-resolution func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { name := req.NamespacedName internalServiceExport := fleetnetv1alpha1.InternalServiceExport{} internalServiceExportKRef := klog.KRef(name.Namespace, name.Name) startTime := time.Now() klog.V(2).InfoS("Reconciliation starts", "internalServiceExport", internalServiceExportKRef) defer func() { latency := time.Since(startTime).Milliseconds() klog.V(2).InfoS("Reconciliation ends", "internalServiceExport", internalServiceExportKRef, "latency", latency) }() if err := r.Client.Get(ctx, name, &internalServiceExport); err != nil { if errors.IsNotFound(err) { klog.V(4).InfoS("Ignoring NotFound internalServiceExport", "internalServiceExport", internalServiceExportKRef) return ctrl.Result{}, nil } klog.ErrorS(err, "Failed to get internalServiceExport", "internalServiceExport", internalServiceExportKRef) return ctrl.Result{}, err } if internalServiceExport.ObjectMeta.DeletionTimestamp != nil { return r.handleDelete(ctx, &internalServiceExport) } // register finalizer if !controllerutil.ContainsFinalizer(&internalServiceExport, objectmeta.InternalServiceExportFinalizer) { controllerutil.AddFinalizer(&internalServiceExport, objectmeta.InternalServiceExportFinalizer) if err := r.Update(ctx, &internalServiceExport); err != nil { klog.ErrorS(err, "Failed to add internalServiceExport finalizer", "internalServiceExport", internalServiceExportKRef) return ctrl.Result{}, err } } // handle update return r.handleUpdate(ctx, &internalServiceExport) } func (r *Reconciler) handleDelete(ctx context.Context, internalServiceExport *fleetnetv1alpha1.InternalServiceExport) (ctrl.Result, error) { // the internalServiceExport is being deleted if !controllerutil.ContainsFinalizer(internalServiceExport, objectmeta.InternalServiceExportFinalizer) { return ctrl.Result{}, nil } internalServiceExportKObj := klog.KObj(internalServiceExport) klog.V(2).InfoS("Removing internalServiceExport", "internalServiceExport", internalServiceExportKObj) // get serviceImport serviceImport := &fleetnetv1alpha1.ServiceImport{} serviceImportName := types.NamespacedName{Namespace: internalServiceExport.Spec.ServiceReference.Namespace, Name: internalServiceExport.Spec.ServiceReference.Name} serviceImportKRef := klog.KRef(serviceImportName.Namespace, serviceImportName.Name) if err := r.Client.Get(ctx, serviceImportName, serviceImport); err != nil { klog.ErrorS(err, "Failed to get serviceImport", "serviceImport", serviceImportKRef, "internalServiceExport", internalServiceExportKObj) if !errors.IsNotFound(err) { return ctrl.Result{}, err } return r.removeFinalizer(ctx, internalServiceExport) } // check serviceImport spec if len(serviceImport.Status.Ports) == 0 { // Requeue the request and waiting for the ServiceImport controller to resolve the spec. // In case serviceImport picks the same spec as the deleting one at the same time and controller misses removing // the clusterID from the serviceImport. klog.V(2).InfoS("Waiting for serviceImport controller to resolve the spec", "serviceImport", serviceImportKRef, "internalServiceExport", internalServiceExportKObj) return ctrl.Result{RequeueAfter: r.RetryInternal}, nil } oldStatus := serviceImport.Status.DeepCopy() removeClusterFromServiceImportStatus(serviceImport, internalServiceExport.Spec.ServiceReference.ClusterID) if err := r.updateServiceImportStatus(ctx, serviceImport, oldStatus); err != nil { return ctrl.Result{}, err } return r.removeFinalizer(ctx, internalServiceExport) } func removeClusterFromServiceImportStatus(serviceImport *fleetnetv1alpha1.ServiceImport, clusterID string) { var updatedClusters []fleetnetv1alpha1.ClusterStatus for _, c := range serviceImport.Status.Clusters { if c.Cluster != clusterID { updatedClusters = append(updatedClusters, c) } } if len(updatedClusters) == 0 { serviceImport.Status = fleetnetv1alpha1.ServiceImportStatus{} } else { serviceImport.Status.Clusters = updatedClusters } } func addClusterToServiceImportStatus(serviceImport *fleetnetv1alpha1.ServiceImport, clusterID string) { for _, c := range serviceImport.Status.Clusters { if c.Cluster == clusterID { return } } serviceImport.Status.Clusters = append(serviceImport.Status.Clusters, fleetnetv1alpha1.ClusterStatus{Cluster: clusterID}) } func (r *Reconciler) updateServiceImportStatus(ctx context.Context, serviceImport *fleetnetv1alpha1.ServiceImport, oldStatus *fleetnetv1alpha1.ServiceImportStatus) error { if equality.Semantic.DeepEqual(&serviceImport.Status, oldStatus) { // no change return nil } serviceImportKObj := klog.KObj(serviceImport) klog.V(2).InfoS("Updating the serviceImport status", "serviceImport", serviceImportKObj, "oldStatus", oldStatus, "status", serviceImport.Status) if err := r.Client.Status().Update(ctx, serviceImport); err != nil { klog.ErrorS(err, "Failed to update the serviceImport status", "serviceImport", serviceImportKObj, "oldStatus", oldStatus, "status", serviceImport.Status) return err } return nil } func (r *Reconciler) removeFinalizer(ctx context.Context, internalServiceExport *fleetnetv1alpha1.InternalServiceExport) (ctrl.Result, error) { // remove the finalizer controllerutil.RemoveFinalizer(internalServiceExport, objectmeta.InternalServiceExportFinalizer) if err := r.Client.Update(ctx, internalServiceExport); err != nil { klog.ErrorS(err, "Failed to remove internalServiceExport finalizer", "internalServiceExport", klog.KObj(internalServiceExport)) return ctrl.Result{}, err } return ctrl.Result{}, nil } func (r *Reconciler) updateInternalServiceExportStatus(ctx context.Context, internalServiceExport *fleetnetv1alpha1.InternalServiceExport, conflict bool) error { desiredCond := condition.UnconflictedServiceExportConflictCondition(*internalServiceExport) if conflict { desiredCond = condition.ConflictedServiceExportConflictCondition(*internalServiceExport) } currentCond := meta.FindStatusCondition(internalServiceExport.Status.Conditions, string(fleetnetv1alpha1.ServiceExportConflict)) if condition.EqualCondition(currentCond, &desiredCond) { return nil } exportKObj := klog.KObj(internalServiceExport) oldStatus := internalServiceExport.Status.DeepCopy() meta.SetStatusCondition(&internalServiceExport.Status.Conditions, desiredCond) klog.V(2).InfoS("Updating internalServiceExport status", "internalServiceExport", exportKObj, "status", internalServiceExport.Status, "oldStatus", oldStatus) if err := r.Status().Update(ctx, internalServiceExport); err != nil { klog.ErrorS(err, "Failed to update internalServiceExport status", "internalServiceExport", exportKObj, "status", internalServiceExport.Status, "oldStatus", oldStatus) return err } return nil } func (r *Reconciler) handleUpdate(ctx context.Context, internalServiceExport *fleetnetv1alpha1.InternalServiceExport) (ctrl.Result, error) { internalServiceExportKObj := klog.KObj(internalServiceExport) // get serviceImport serviceImport := &fleetnetv1alpha1.ServiceImport{} serviceImportName := types.NamespacedName{Namespace: internalServiceExport.Spec.ServiceReference.Namespace, Name: internalServiceExport.Spec.ServiceReference.Name} serviceImportKRef := klog.KRef(serviceImportName.Namespace, serviceImportName.Name) if err := r.Client.Get(ctx, serviceImportName, serviceImport); err != nil { if !errors.IsNotFound(err) { klog.ErrorS(err, "Failed to get serviceImport", "serviceImport", serviceImportKRef, "internalServiceExport", internalServiceExportKObj) return ctrl.Result{}, err } serviceImport = &fleetnetv1alpha1.ServiceImport{ ObjectMeta: metav1.ObjectMeta{ Namespace: serviceImportName.Namespace, Name: serviceImportName.Name, }, } klog.V(2).InfoS("Creating serviceImport", "serviceImport", serviceImportKRef, "internalServiceExport", internalServiceExportKObj) if err := r.Client.Create(ctx, serviceImport); err != nil { klog.ErrorS(err, "Failed to create or update service import", "serviceImport", serviceImportKRef, "internalServiceExport", internalServiceExportKObj) return ctrl.Result{}, err } } if len(serviceImport.Status.Ports) == 0 { // Requeue the request and waiting for the ServiceImport controller to resolve the spec. klog.V(3).InfoS("Waiting for serviceImport controller to resolve the spec", "serviceImport", serviceImportKRef, "internalServiceExport", internalServiceExportKObj) return ctrl.Result{RequeueAfter: r.RetryInternal}, nil } oldStatus := serviceImport.Status.DeepCopy() clusterID := internalServiceExport.Spec.ServiceReference.ClusterID // To simplify the implementation, we compare the whole ports structure. // TODO, change to compare the ports by ignoring the order and protocol and port are the map keys. if !equality.Semantic.DeepEqual(serviceImport.Status.Ports, internalServiceExport.Spec.Ports) { removeClusterFromServiceImportStatus(serviceImport, clusterID) if err := r.updateServiceImportStatus(ctx, serviceImport, oldStatus); err != nil { return ctrl.Result{}, err } // It's possible, eg, there is only one serviceExport and its spec has been changed. // ServiceImport stores the old spec of this ServiceExport and later the serviceExport changes its spec. if len(serviceImport.Status.Ports) == 0 { klog.V(3).InfoS("Removed the cluster and waiting for serviceImport controller to resolve the spec", "serviceImport", serviceImportKRef, "internalServiceExport", internalServiceExportKObj) // Requeue the request and waiting for the ServiceImport controller to resolve the spec. return ctrl.Result{RequeueAfter: r.RetryInternal}, nil } return ctrl.Result{}, r.updateInternalServiceExportStatus(ctx, internalServiceExport, true) } addClusterToServiceImportStatus(serviceImport, clusterID) if err := r.updateServiceImportStatus(ctx, serviceImport, oldStatus); err != nil { return ctrl.Result{}, err } return ctrl.Result{}, r.updateInternalServiceExportStatus(ctx, internalServiceExport, false) } // SetupWithManager sets up the controller with the Manager. func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&fleetnetv1alpha1.InternalServiceExport{}). Complete(r) }