pkg/controllers/member/internalmembercluster/v1alpha1/controller.go (201 lines of code) (raw):

/* Copyright (c) Microsoft Corporation. Licensed under the MIT license. */ // Package internalmembercluster features internalmembercluster controller to report its heartbeat to the hub by updating // internalMemberCluster and cleanup the resources before leave. // For example, MCS agent needs to report the heartbeat after join and cleanup the created MCSes before leave. // For now, there are two kinds of agents exist in the member cluster: MCS agent and ServiceExportImport agent. package internalmembercluster import ( "context" "errors" "time" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/rand" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1" fleetnetv1alpha1 "go.goms.io/fleet-networking/api/v1alpha1" "go.goms.io/fleet-networking/pkg/common/apiretry" ) const ( conditionReasonJoined = "AgentJoined" conditionReasonLeft = "AgentLeft" // we add +-5% jitter jitterPercent = 10 ) // Reconciler reconciles a InternalMemberCluster object. type Reconciler struct { MemberClient client.Client HubClient client.Client AgentType fleetv1alpha1.AgentType } //+kubebuilder:rbac:groups=fleet.azure.com,resources=internalmemberclusters,verbs=get;list;watch //+kubebuilder:rbac:groups=fleet.azure.com,resources=internalmemberclusters/status,verbs=get;update;patch //+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=multiclusterservices,verbs=get;list;delete //+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=serviceexports,verbs=get;list;delete // Reconcile handles join/leave for the member cluster controllers and updates its heartbeats. // For the MCS controller, it needs to delete created MCS related in the member clusters. // For the ServiceExportImport controllers, it needs to delete created serviceExported related in the member clusters. func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { imcKRef := klog.KRef(req.Namespace, req.Name) startTime := time.Now() klog.V(2).InfoS("Reconciliation starts", "internalMemberCluster", imcKRef) defer func() { latency := time.Since(startTime).Milliseconds() klog.V(2).InfoS("Reconciliation ends", "internalMemberCluster", imcKRef, "latency", latency) }() var imc fleetv1alpha1.InternalMemberCluster if err := r.HubClient.Get(ctx, req.NamespacedName, &imc); err != nil { if apierrors.IsNotFound(err) { klog.V(4).InfoS("Ignoring NotFound internalMemberCluster", "internalMemberCluster", imcKRef) return ctrl.Result{}, nil } klog.ErrorS(err, "Failed to get internal member cluster", "internalMemberCluster", imcKRef) return ctrl.Result{}, err } switch imc.Spec.State { case fleetv1alpha1.ClusterStateJoin: agentStatus := fleetv1alpha1.AgentStatus{ Type: r.AgentType, Conditions: []metav1.Condition{ { Type: string(fleetv1alpha1.AgentJoined), Status: metav1.ConditionTrue, Reason: conditionReasonJoined, ObservedGeneration: imc.GetGeneration(), }, }, LastReceivedHeartbeat: metav1.NewTime(time.Now()), } if err := r.updateAgentStatus(ctx, &imc, agentStatus); err != nil { return ctrl.Result{}, err } // add jitter to the heart beat to mitigate the herding of multiple agents hbInterval := 1000 * imc.Spec.HeartbeatPeriodSeconds jitterRange := int64(hbInterval*jitterPercent) / 100 requeueAfter := time.Millisecond * (time.Duration(hbInterval) + time.Duration(rand.Int63nRange(0, jitterRange)-jitterRange/2)) return ctrl.Result{RequeueAfter: requeueAfter}, nil case fleetv1alpha1.ClusterStateLeave: if r.AgentType == fleetv1alpha1.MultiClusterServiceAgent { if err := r.cleanupMCSRelatedResources(ctx); err != nil { return ctrl.Result{}, err } } if r.AgentType == fleetv1alpha1.ServiceExportImportAgent { if err := r.cleanupServiceExportRelatedResources(ctx); err != nil { return ctrl.Result{}, err } } agentStatus := fleetv1alpha1.AgentStatus{ Type: r.AgentType, Conditions: []metav1.Condition{ { Type: string(fleetv1alpha1.AgentJoined), Status: metav1.ConditionFalse, Reason: conditionReasonLeft, ObservedGeneration: imc.GetGeneration(), }, }, } return ctrl.Result{}, r.updateAgentStatus(ctx, &imc, agentStatus) default: klog.ErrorS(errors.New("unknown state"), "internalMemberCluster", imcKRef, "state", imc.Spec.State) } return ctrl.Result{}, nil } func findAgentStatus(status []fleetv1alpha1.AgentStatus, agentType fleetv1alpha1.AgentType) *fleetv1alpha1.AgentStatus { for i := range status { if status[i].Type == agentType { return &status[i] } } return nil } // setAgentStatus sets the corresponding condition in conditions based on the new agent status. // status must be non-nil. // 1. if the condition of the specified type already exists (all fields of the existing condition are updated to // newCondition, LastTransitionTime is set to now if the new status differs from the old status) // 2. if a condition of the specified type does not exist (LastTransitionTime is set to now() if unset, and newCondition is appended) func setAgentStatus(status *[]fleetv1alpha1.AgentStatus, newStatus fleetv1alpha1.AgentStatus) { existingStatus := findAgentStatus(*status, newStatus.Type) if existingStatus == nil { for i := range newStatus.Conditions { if newStatus.Conditions[i].LastTransitionTime.IsZero() { newStatus.Conditions[i].LastTransitionTime = metav1.NewTime(time.Now()) } } *status = append(*status, newStatus) return } for i := range newStatus.Conditions { meta.SetStatusCondition(&existingStatus.Conditions, newStatus.Conditions[i]) } existingStatus.LastReceivedHeartbeat = newStatus.LastReceivedHeartbeat } func (r *Reconciler) updateAgentStatus(ctx context.Context, imc *fleetv1alpha1.InternalMemberCluster, desiredAgentStatus fleetv1alpha1.AgentStatus) error { setAgentStatus(&imc.Status.AgentStatus, desiredAgentStatus) imcKObj := klog.KObj(imc) klog.V(2).InfoS("Updating internalMemberCluster status", "internalMemberCluster", imcKObj, "agentStatus", imc.Status.AgentStatus) if err := r.HubClient.Status().Update(ctx, imc); err != nil { if apierrors.IsConflict(err) { klog.V(2).InfoS("Failed to update internalMemberCluster status due to conflicts", "internalMemberCluster", klog.KObj(imc)) } else { klog.ErrorS(err, "Failed to update internalMemberCluster status", "internalMemberCluster", klog.KObj(imc)) } return err } return nil } // cleanupMCSRelatedResources deletes the MCS related resources. // Ideally it should stop the controllers. // For now, it tries its best to delete the existing MCS and won't handle the newly created resources for now. func (r *Reconciler) cleanupMCSRelatedResources(ctx context.Context) error { list := &fleetnetv1alpha1.MultiClusterServiceList{} if err := r.MemberClient.List(ctx, list); err != nil { klog.ErrorS(err, "Failed to list multiClusterService") return err } for i := range list.Items { if list.Items[i].ObjectMeta.DeletionTimestamp != nil { continue } deleteFunc := func() error { return r.MemberClient.Delete(ctx, &list.Items[i]) } if err := apiretry.Do(deleteFunc); err != nil && !apierrors.IsNotFound(err) { klog.ErrorS(err, "Failed to delete multiClusterService", "multiClusterService", klog.KObj(&list.Items[i])) return err } } for i := range list.Items { name := types.NamespacedName{Namespace: list.Items[i].GetNamespace(), Name: list.Items[i].GetName()} mcs := fleetnetv1alpha1.MultiClusterService{} getFunc := func() error { err := r.MemberClient.Get(ctx, name, &mcs) return err } if err := apiretry.WaitUntilObjectDeleted(ctx, getFunc); err != nil { klog.ErrorS(err, "Wait the multiClusterService to be deleted", "multiClusterService", name) return err } } klog.V(2).InfoS("Completed cleanup mcs related resources", "mcsCounter", len(list.Items)) return nil } // cleanupServiceExportRelatedResources deletes the serviceExport related resources. // Ideally it should stop the controllers. // For now, it tries its best to delete the existing serviceExport and won't handle the newly created resources for now. func (r *Reconciler) cleanupServiceExportRelatedResources(ctx context.Context) error { list := &fleetnetv1alpha1.ServiceExportList{} if err := r.MemberClient.List(ctx, list); err != nil { klog.ErrorS(err, "Failed to list serviceExport") return err } for i := range list.Items { if list.Items[i].ObjectMeta.DeletionTimestamp != nil { continue } deleteFunc := func() error { return r.MemberClient.Delete(ctx, &list.Items[i]) } if err := apiretry.Do(deleteFunc); err != nil && !apierrors.IsNotFound(err) { klog.ErrorS(err, "Failed to delete serviceExport", "serviceExport", klog.KObj(&list.Items[i])) return err } } for i := range list.Items { name := types.NamespacedName{Namespace: list.Items[i].GetNamespace(), Name: list.Items[i].GetName()} svcExport := fleetnetv1alpha1.ServiceExport{} getFunc := func() error { return r.MemberClient.Get(ctx, name, &svcExport) } if err := apiretry.WaitUntilObjectDeleted(ctx, getFunc); err != nil { klog.ErrorS(err, "Failed to get the serviceExport", "serviceExport", name) return err } } klog.V(2).InfoS("Completed cleanup serviceExport related resources", "serviceExportCounter", len(list.Items)) return nil } // SetupWithManager sets up the controller with the Manager. func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&fleetv1alpha1.InternalMemberCluster{}). Complete(r) }