pkg/controllers/membercluster/v1beta1/membercluster_controller.go (542 lines of code) (raw):
/*
Copyright 2025 The KubeFleet Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1beta1
import (
"context"
"fmt"
"reflect"
"time"
"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
runtime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
ctrl "sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"go.goms.io/fleet/apis"
clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1"
placementv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
"go.goms.io/fleet/pkg/metrics"
"go.goms.io/fleet/pkg/utils"
"go.goms.io/fleet/pkg/utils/condition"
"go.goms.io/fleet/pkg/utils/controller"
)
const (
eventReasonNamespaceCreated = "NamespaceCreated"
eventReasonNamespacePatched = "NamespacePatched"
eventReasonRoleCreated = "RoleCreated"
eventReasonRoleUpdated = "RoleUpdated"
eventReasonRoleBindingCreated = "RoleBindingCreated"
eventReasonRoleBindingUpdated = "RoleBindingUpdated"
eventReasonIMCCreated = "InternalMemberClusterCreated"
eventReasonIMCSpecUpdated = "InternalMemberClusterSpecUpdated"
reasonMemberClusterReadyToJoin = "MemberClusterReadyToJoin"
reasonMemberClusterNotReadyToJoin = "MemberClusterNotReadyToJoin"
reasonMemberClusterJoined = "MemberClusterJoined"
reasonMemberClusterLeft = "MemberClusterLeft"
reasonMemberClusterUnknown = "MemberClusterJoinStateUnknown"
)
// Reconciler reconciles a MemberCluster object
type Reconciler struct {
client.Client
recorder record.EventRecorder
// Need to update MC based on the IMC conditions based on the agent list.
NetworkingAgentsEnabled bool
// the max number of concurrent reconciles per controller.
MaxConcurrentReconciles int
// the wait time in minutes before we force delete a member cluster.
ForceDeleteWaitTime time.Duration
// agents are used as hashset to query the expected agent type, so the value will be ignored.
agents map[clusterv1beta1.AgentType]bool
}
func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtime.Result, error) {
startTime := time.Now()
klog.V(2).InfoS("MemberCluster reconciliation starts", "memberCluster", req.NamespacedName)
defer func() {
latency := time.Since(startTime).Milliseconds()
klog.V(2).InfoS("MemberCluster reconciliation ends", "memberCluster", req.NamespacedName, "latency", latency)
}()
var mc clusterv1beta1.MemberCluster
if err := r.Client.Get(ctx, req.NamespacedName, &mc); err != nil {
klog.ErrorS(err, "Failed to get member cluster", "memberCluster", req.Name)
return runtime.Result{}, client.IgnoreNotFound(err)
}
mcObjRef := klog.KObj(&mc)
// Handle deleting/leaving member cluster, garbage collect all the resources in the cluster namespace
if !mc.DeletionTimestamp.IsZero() {
klog.V(2).InfoS("The member cluster is leaving", "memberCluster", mcObjRef)
return r.handleDelete(ctx, mc.DeepCopy())
}
// Add the finalizer to the member cluster
if err := r.ensureFinalizer(ctx, &mc); err != nil {
klog.ErrorS(err, "Failed to add the finalizer to member cluster", "memberCluster", mcObjRef)
return runtime.Result{}, err
}
currentIMC, err := r.getInternalMemberCluster(ctx, mc.GetName())
if err != nil {
return runtime.Result{}, err
}
if err := r.join(ctx, &mc, currentIMC); err != nil {
klog.ErrorS(err, "Failed to join", "memberCluster", mcObjRef)
return runtime.Result{}, err
}
// Copy status from InternalMemberCluster to MemberCluster.
r.syncInternalMemberClusterStatus(currentIMC, &mc)
if err := r.updateMemberClusterStatus(ctx, &mc); err != nil {
if apierrors.IsConflict(err) {
klog.V(2).InfoS("Failed to update status due to conflicts", "memberCluster", mcObjRef)
} else {
klog.ErrorS(err, "Failed to update status", "memberCluster", mcObjRef)
}
return runtime.Result{}, client.IgnoreNotFound(err)
}
return runtime.Result{}, nil
}
// handleDelete handles the delete event of the member cluster, makes sure the agent has finished leaving the fleet first and
// then garbage collects all the resources in the cluster namespace.
func (r *Reconciler) handleDelete(ctx context.Context, mc *clusterv1beta1.MemberCluster) (runtime.Result, error) {
mcObjRef := klog.KObj(mc)
if !controllerutil.ContainsFinalizer(mc, placementv1beta1.MemberClusterFinalizer) {
klog.V(2).InfoS("No need to do anything for the deleting member cluster without a finalizer", "memberCluster", mcObjRef)
return runtime.Result{}, nil
}
// check if the namespace still exist
var currentNS corev1.Namespace
namespaceName := fmt.Sprintf(utils.NamespaceNameFormat, mc.Name)
if err := r.Client.Get(ctx, types.NamespacedName{Name: namespaceName}, ¤tNS); err != nil {
if !apierrors.IsNotFound(err) {
klog.ErrorS(err, "Failed to get the member cluster namespace", "memberCluster", mcObjRef)
return runtime.Result{}, controller.NewAPIServerError(true, err)
}
klog.V(2).InfoS("The member cluster namespace is not found, remove the finalizer", "memberCluster", mcObjRef)
controllerutil.RemoveFinalizer(mc, placementv1beta1.MemberClusterFinalizer)
return runtime.Result{}, controller.NewUpdateIgnoreConflictError(r.Update(ctx, mc))
}
// check if the namespace is being deleted already, just wait for it to be deleted
if !currentNS.DeletionTimestamp.IsZero() {
klog.V(2).InfoS("The member cluster namespace is still being deleted", "memberCluster", mcObjRef, "deleteTimestamp", currentNS.DeletionTimestamp)
var stuckErr error
if time.Now().After(currentNS.DeletionTimestamp.Add(15 * time.Minute)) {
// alert if the namespace is stuck in deleting for more than 15 minutes
stuckErr = controller.NewUnexpectedBehaviorError(fmt.Errorf("failed to delete the member cluster namespace %s, it has been deleting since %s", namespaceName, currentNS.DeletionTimestamp.Format(time.RFC3339)))
}
return runtime.Result{RequeueAfter: time.Second}, stuckErr
}
currentImc := &clusterv1beta1.InternalMemberCluster{}
imcNamespacedName := types.NamespacedName{Namespace: namespaceName, Name: mc.Name}
if err := r.Client.Get(ctx, imcNamespacedName, currentImc); err != nil {
if !apierrors.IsNotFound(err) {
klog.ErrorS(err, "Failed to get internal member cluster", "internalMemberCluster", imcNamespacedName)
return runtime.Result{}, controller.NewAPIServerError(true, err)
}
// this is possible since we garbage collect the internal member cluster first before deleting the MC
klog.V(2).Info("InternalMemberCluster not found, start garbage collecting", "memberCluster", mcObjRef)
return runtime.Result{Requeue: true}, r.garbageCollect(ctx, mc)
}
// calculate the current status of the member cluster from imc status
r.syncInternalMemberClusterStatus(currentImc, mc)
// check if the cluster is already left
mcJoinedCondition := meta.FindStatusCondition(mc.Status.Conditions, string(clusterv1beta1.ConditionTypeMemberClusterJoined))
if condition.IsConditionStatusFalse(mcJoinedCondition, mc.GetGeneration()) {
klog.V(2).InfoS("Agent already left, start garbage collecting", "memberCluster", mcObjRef)
if gcErr := r.garbageCollect(ctx, mc); gcErr != nil {
return runtime.Result{}, gcErr
}
return runtime.Result{Requeue: true}, controller.NewUpdateIgnoreConflictError(r.updateMemberClusterStatus(ctx, mc))
}
// check to see if we can force delete member cluster.
if currentImc.Spec.State == clusterv1beta1.ClusterStateLeave && time.Since(mc.DeletionTimestamp.Time) >= r.ForceDeleteWaitTime {
klog.V(2).InfoS("Force delete the member cluster, by garbage collecting owned resources", "memberCluster", mcObjRef)
return runtime.Result{Requeue: true}, r.garbageCollect(ctx, mc)
}
klog.V(2).InfoS("Need to wait for the agent to leave", "memberCluster", mcObjRef, "joinedCondition", mcJoinedCondition)
// mark the imc as left to make sure the agent is leaving the fleet
if err := r.leave(ctx, mc, currentImc); err != nil {
klog.ErrorS(err, "Failed to mark the imc as leave", "memberCluster", mcObjRef)
return runtime.Result{}, err
}
// update the mc status to track the leaving status while we wait for all the agents to leave.
// once the imc is updated, the mc controller will reconcile again ,or we reconcile to force delete
// the member cluster after force delete wait time.
return runtime.Result{RequeueAfter: r.ForceDeleteWaitTime}, controller.NewUpdateIgnoreConflictError(r.updateMemberClusterStatus(ctx, mc))
}
func (r *Reconciler) getInternalMemberCluster(ctx context.Context, name string) (*clusterv1beta1.InternalMemberCluster, error) {
// Get current internal member cluster.
namespaceName := fmt.Sprintf(utils.NamespaceNameFormat, name)
imcNamespacedName := types.NamespacedName{Namespace: namespaceName, Name: name}
currentIMC := &clusterv1beta1.InternalMemberCluster{}
if err := r.Client.Get(ctx, imcNamespacedName, currentIMC); err != nil {
if !apierrors.IsNotFound(err) {
klog.ErrorS(err, "Failed to get internal member cluster", "internalMemberCluster", imcNamespacedName)
return nil, err
}
// Not found.
currentIMC = nil
}
return currentIMC, nil
}
// garbageCollectWork remove all the finalizers on the work that are in the cluster namespace
func (r *Reconciler) garbageCollectWork(ctx context.Context, mc *clusterv1beta1.MemberCluster, namespaceName string) error {
// list all the work object we created in the member cluster namespace
var works placementv1beta1.WorkList
listOpts := []client.ListOption{
client.InNamespace(namespaceName),
}
if err := r.Client.List(ctx, &works, listOpts...); err != nil {
klog.ErrorS(err, "Failed to list all the work object", "memberCluster", klog.KObj(mc))
return client.IgnoreNotFound(err)
}
// remove all the finalizers on the work objects in parallel
errs, cctx := errgroup.WithContext(ctx)
for _, work := range works.Items {
staleWork := work.DeepCopy()
errs.Go(func() error {
staleWork.SetFinalizers(nil)
if updateErr := r.Update(cctx, staleWork, &client.UpdateOptions{}); updateErr != nil {
klog.ErrorS(updateErr, "Failed to remove the finalizer from the work",
"memberCluster", klog.KObj(mc), "work", klog.KObj(staleWork))
return updateErr
}
return nil
})
}
klog.V(2).InfoS("Try to remove all the work finalizers in the cluster namespace",
"memberCluster", klog.KObj(mc), "number of work", len(works.Items))
return controller.NewUpdateIgnoreConflictError(errs.Wait())
}
// garbageCollect is used to garbage collect all the resources in the cluster namespace associated with the member cluster.
func (r *Reconciler) garbageCollect(ctx context.Context, mc *clusterv1beta1.MemberCluster) error {
// check if the namespace still exist
var clusterNS corev1.Namespace
namespaceName := fmt.Sprintf(utils.NamespaceNameFormat, mc.Name)
if err := r.Client.Get(ctx, types.NamespacedName{Name: namespaceName}, &clusterNS); err != nil {
klog.ErrorS(err, "Failed to get the member cluster namespace", "memberCluster", klog.KObj(mc))
return controller.NewAPIServerError(true, err)
}
if err := r.garbageCollectWork(ctx, mc, namespaceName); err != nil {
return err
}
if err := r.Delete(ctx, &clusterNS); err != nil {
klog.ErrorS(err, "Failed to remove the cluster namespace", "memberCluster", klog.KObj(mc), "namespace", namespaceName)
return controller.NewAPIServerError(false, err)
}
klog.V(2).InfoS("Deleted the member cluster namespace", "memberCluster", klog.KObj(mc))
return nil
}
// ensureFinalizer makes sure that the member cluster CR has a finalizer on it
func (r *Reconciler) ensureFinalizer(ctx context.Context, mc *clusterv1beta1.MemberCluster) error {
if controllerutil.ContainsFinalizer(mc, placementv1beta1.MemberClusterFinalizer) {
return nil
}
klog.InfoS("Added the member cluster finalizer", "memberCluster", klog.KObj(mc))
controllerutil.AddFinalizer(mc, placementv1beta1.MemberClusterFinalizer)
return r.Update(ctx, mc, client.FieldOwner(utils.MCControllerFieldManagerName))
}
// join takes the actions to make hub cluster ready for member cluster to join, including:
// - Create namespace for member cluster
// - Create role & role bindings for member cluster to access hub cluster
// - Create InternalMemberCluster with state=Join for member cluster
// - Set ReadyToJoin to true
//
// Condition ReadyToJoin == true means all the above actions have been done successfully at least once.
// It will never turn false after true.
func (r *Reconciler) join(ctx context.Context, mc *clusterv1beta1.MemberCluster, imc *clusterv1beta1.InternalMemberCluster) error {
klog.V(2).InfoS("join", "memberCluster", klog.KObj(mc))
namespaceName, err := r.syncNamespace(ctx, mc)
if err != nil {
return fmt.Errorf("failed to sync namespace: %w", err)
}
roleName, err := r.syncRole(ctx, mc, namespaceName)
if err != nil {
return fmt.Errorf("failed to sync role: %w", err)
}
err = r.syncRoleBinding(ctx, mc, namespaceName, roleName)
if err != nil {
return fmt.Errorf("failed to sync role binding: %w", err)
}
if _, err := r.syncInternalMemberCluster(ctx, mc, namespaceName, imc); err != nil {
return fmt.Errorf("failed to sync internal member cluster spec: %w", err)
}
markMemberClusterReadyToJoin(r.recorder, mc)
return nil
}
// leave notifies member cluster to leave by setting InternalMemberCluster's state to Leave.
//
// Note that leave doesn't delete any of the resources created by join(). Instead, deleting MemberCluster will delete them.
func (r *Reconciler) leave(ctx context.Context, mc *clusterv1beta1.MemberCluster, imc *clusterv1beta1.InternalMemberCluster) error {
klog.V(2).InfoS("Mark the internal cluster state as `Leave`", "memberCluster", klog.KObj(mc))
// Copy spec from member cluster to internal member cluster.
namespaceName := fmt.Sprintf(utils.NamespaceNameFormat, mc.Name)
if _, err := r.syncInternalMemberCluster(ctx, mc, namespaceName, imc); err != nil {
return fmt.Errorf("failed to sync internal member cluster spec: %w", err)
}
return nil
}
// syncNamespace creates or updates the namespace for member cluster.
func (r *Reconciler) syncNamespace(ctx context.Context, mc *clusterv1beta1.MemberCluster) (string, error) {
klog.V(2).InfoS("Sync the namespace for the member cluster", "memberCluster", klog.KObj(mc))
namespaceName := fmt.Sprintf(utils.NamespaceNameFormat, mc.Name)
fleetNamespaceLabelValue := "true"
expectedNS := corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: namespaceName,
OwnerReferences: []metav1.OwnerReference{*toOwnerReference(mc)},
Labels: map[string]string{placementv1beta1.FleetResourceLabelKey: fleetNamespaceLabelValue},
},
}
// Creates namespace if not found.
var currentNS corev1.Namespace
if err := r.Client.Get(ctx, types.NamespacedName{Name: namespaceName}, ¤tNS); err != nil {
if !apierrors.IsNotFound(err) {
return "", fmt.Errorf("failed to get namespace %s: %w", namespaceName, err)
}
klog.V(2).InfoS("creating namespace", "memberCluster", klog.KObj(mc), "namespace", namespaceName)
// Make sure the entire namespace is removed if the member cluster is deleted.
if err = r.Client.Create(ctx, &expectedNS, client.FieldOwner(utils.MCControllerFieldManagerName)); err != nil {
return "", fmt.Errorf("failed to create namespace %s: %w", namespaceName, err)
}
r.recorder.Event(mc, corev1.EventTypeNormal, eventReasonNamespaceCreated, "Namespace was created")
klog.V(2).InfoS("created namespace", "memberCluster", klog.KObj(mc), "namespace", namespaceName)
return namespaceName, nil
}
// migration: To add new label to all existing member cluster namespaces.
if currentNS.GetLabels()[placementv1beta1.FleetResourceLabelKey] == "" {
klog.V(2).InfoS("patching namespace", "memberCluster", klog.KObj(mc), "namespace", namespaceName)
patch := client.MergeFrom(currentNS.DeepCopy())
currentNS.ObjectMeta.Labels[placementv1beta1.FleetResourceLabelKey] = fleetNamespaceLabelValue
if err := r.Client.Patch(ctx, ¤tNS, patch, client.FieldOwner(utils.MCControllerFieldManagerName)); err != nil {
return "", fmt.Errorf("failed to patch namespace %s: %w", namespaceName, err)
}
r.recorder.Event(mc, corev1.EventTypeNormal, eventReasonNamespacePatched, "Namespace was patched")
klog.V(2).InfoS("patched namespace", "memberCluster", klog.KObj(mc), "namespace", namespaceName)
}
return namespaceName, nil
}
// syncRole creates or updates the role for member cluster to access its namespace in hub cluster.
func (r *Reconciler) syncRole(ctx context.Context, mc *clusterv1beta1.MemberCluster, namespaceName string) (string, error) {
klog.V(2).InfoS("Sync the role for the member cluster", "memberCluster", klog.KObj(mc))
// Role name is created using member cluster name.
roleName := fmt.Sprintf(utils.RoleNameFormat, mc.Name)
expectedRole := rbacv1.Role{
ObjectMeta: metav1.ObjectMeta{
Name: roleName,
Namespace: namespaceName,
OwnerReferences: []metav1.OwnerReference{*toOwnerReference(mc)},
},
Rules: []rbacv1.PolicyRule{utils.FleetClusterRule, utils.FleetPlacementRule, utils.FleetNetworkRule, utils.EventRule},
}
// Creates role if not found.
var currentRole rbacv1.Role
if err := r.Client.Get(ctx, types.NamespacedName{Name: roleName, Namespace: namespaceName}, ¤tRole); err != nil {
if !apierrors.IsNotFound(err) {
return "", fmt.Errorf("failed to get role %s: %w", roleName, err)
}
klog.V(2).InfoS("creating role", "memberCluster", klog.KObj(mc), "role", roleName)
if err = r.Client.Create(ctx, &expectedRole, client.FieldOwner(utils.MCControllerFieldManagerName)); err != nil {
return "", fmt.Errorf("failed to create role %s with rules %+v: %w", roleName, expectedRole.Rules, err)
}
r.recorder.Event(mc, corev1.EventTypeNormal, eventReasonRoleCreated, "role was created")
klog.V(2).InfoS("created role", "memberCluster", klog.KObj(mc), "role", roleName)
return roleName, nil
}
// Updates role if currentRole != expectedRole.
if reflect.DeepEqual(currentRole.Rules, expectedRole.Rules) {
return roleName, nil
}
currentRole.Rules = expectedRole.Rules
klog.V(2).InfoS("updating role", "memberCluster", klog.KObj(mc), "role", roleName)
if err := r.Client.Update(ctx, ¤tRole, client.FieldOwner(utils.MCControllerFieldManagerName)); err != nil {
return "", fmt.Errorf("failed to update role %s with rules %+v: %w", roleName, currentRole.Rules, err)
}
r.recorder.Event(mc, corev1.EventTypeNormal, eventReasonRoleUpdated, "role was updated")
klog.V(2).InfoS("updated role", "memberCluster", klog.KObj(mc), "role", roleName)
return roleName, nil
}
// syncRoleBinding creates or updates the role binding for member cluster to access its namespace in hub cluster.
func (r *Reconciler) syncRoleBinding(ctx context.Context, mc *clusterv1beta1.MemberCluster, namespaceName string, roleName string) error {
klog.V(2).InfoS("Sync the roleBinding for the member cluster", "memberCluster", klog.KObj(mc))
// Role binding name is created using member cluster name
roleBindingName := fmt.Sprintf(utils.RoleBindingNameFormat, mc.Name)
expectedRoleBinding := rbacv1.RoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: roleBindingName,
Namespace: namespaceName,
OwnerReferences: []metav1.OwnerReference{*toOwnerReference(mc)},
},
Subjects: []rbacv1.Subject{mc.Spec.Identity},
RoleRef: rbacv1.RoleRef{
APIGroup: rbacv1.GroupName,
Kind: "Role",
Name: roleName,
},
}
// Creates role binding if not found.
var currentRoleBinding rbacv1.RoleBinding
if err := r.Client.Get(ctx, types.NamespacedName{Name: roleBindingName, Namespace: namespaceName}, ¤tRoleBinding); err != nil {
if !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to get role binding %s: %w", roleBindingName, err)
}
klog.V(2).InfoS("creating role binding", "memberCluster", klog.KObj(mc), "subject", mc.Spec.Identity)
if err = r.Client.Create(ctx, &expectedRoleBinding, client.FieldOwner(utils.MCControllerFieldManagerName)); err != nil {
return fmt.Errorf("failed to create role binding %s: %w", roleBindingName, err)
}
r.recorder.Event(mc, corev1.EventTypeNormal, eventReasonRoleBindingCreated, "role binding was created")
klog.V(2).InfoS("created role binding", "memberCluster", klog.KObj(mc), "subject", mc.Spec.Identity)
return nil
}
// Updates role binding if currentRoleBinding != expectedRoleBinding.
if reflect.DeepEqual(currentRoleBinding.Subjects, expectedRoleBinding.Subjects) && reflect.DeepEqual(currentRoleBinding.RoleRef, expectedRoleBinding.RoleRef) {
return nil
}
currentRoleBinding.Subjects = expectedRoleBinding.Subjects
currentRoleBinding.RoleRef = expectedRoleBinding.RoleRef
klog.V(2).InfoS("updating role binding", "memberCluster", klog.KObj(mc), "subject", mc.Spec.Identity)
if err := r.Client.Update(ctx, &expectedRoleBinding, client.FieldOwner(utils.MCControllerFieldManagerName)); err != nil {
return fmt.Errorf("failed to update role binding %s: %w", roleBindingName, err)
}
r.recorder.Event(mc, corev1.EventTypeNormal, eventReasonRoleBindingUpdated, "role binding was updated")
klog.V(2).InfoS("updated role binding", "memberCluster", klog.KObj(mc), "subject", mc.Spec.Identity)
return nil
}
// syncInternalMemberCluster is used to sync spec from MemberCluster to InternalMemberCluster.
func (r *Reconciler) syncInternalMemberCluster(ctx context.Context, mc *clusterv1beta1.MemberCluster,
namespaceName string, currentImc *clusterv1beta1.InternalMemberCluster) (*clusterv1beta1.InternalMemberCluster, error) {
klog.V(2).InfoS("Sync internalMemberCluster spec from member cluster", "memberCluster", klog.KObj(mc))
expectedImc := clusterv1beta1.InternalMemberCluster{
ObjectMeta: metav1.ObjectMeta{
Name: mc.Name,
Namespace: namespaceName,
OwnerReferences: []metav1.OwnerReference{*toOwnerReference(mc)},
},
Spec: clusterv1beta1.InternalMemberClusterSpec{
HeartbeatPeriodSeconds: mc.Spec.HeartbeatPeriodSeconds,
},
}
if mc.GetDeletionTimestamp().IsZero() {
expectedImc.Spec.State = clusterv1beta1.ClusterStateJoin
} else {
expectedImc.Spec.State = clusterv1beta1.ClusterStateLeave
}
// Creates internal member cluster if not found.
if currentImc == nil {
klog.V(2).InfoS("creating internal member cluster", "InternalMemberCluster", klog.KObj(&expectedImc), "spec", expectedImc.Spec)
if err := r.Client.Create(ctx, &expectedImc, client.FieldOwner(utils.MCControllerFieldManagerName)); err != nil {
return nil, controller.NewAPIServerError(false, fmt.Errorf("failed to create internal member cluster %s with spec %+v: %w", klog.KObj(&expectedImc), expectedImc.Spec, err))
}
r.recorder.Event(mc, corev1.EventTypeNormal, eventReasonIMCCreated, "Internal member cluster was created")
klog.V(2).InfoS("created internal member cluster", "InternalMemberCluster", klog.KObj(&expectedImc), "spec", expectedImc.Spec)
return &expectedImc, nil
}
// Updates internal member cluster if currentImc != expectedImc.
if reflect.DeepEqual(currentImc.Spec, expectedImc.Spec) {
return currentImc, nil
}
currentImc.Spec = expectedImc.Spec
klog.V(2).InfoS("updating internal member cluster", "InternalMemberCluster", klog.KObj(currentImc), "spec", currentImc.Spec)
if err := r.Client.Update(ctx, currentImc, client.FieldOwner(utils.MCControllerFieldManagerName)); err != nil {
return nil, controller.NewAPIServerError(false, fmt.Errorf("failed to update internal member cluster %s with spec %+v: %w", klog.KObj(currentImc), currentImc.Spec, err))
}
r.recorder.Event(mc, corev1.EventTypeNormal, eventReasonIMCSpecUpdated, "internal member cluster spec updated")
klog.V(2).InfoS("updated internal member cluster", "InternalMemberCluster", klog.KObj(currentImc), "spec", currentImc.Spec)
return currentImc, nil
}
func toOwnerReference(memberCluster *clusterv1beta1.MemberCluster) *metav1.OwnerReference {
return &metav1.OwnerReference{APIVersion: clusterv1beta1.GroupVersion.String(), Kind: clusterv1beta1.MemberClusterKind,
Name: memberCluster.Name, UID: memberCluster.UID, Controller: ptr.To(true)}
}
// syncInternalMemberClusterStatus is used to sync status from InternalMemberCluster to MemberCluster & aggregate join conditions from all agents.
func (r *Reconciler) syncInternalMemberClusterStatus(imc *clusterv1beta1.InternalMemberCluster, mc *clusterv1beta1.MemberCluster) {
klog.V(2).InfoS("Sync the internalMemberCluster status", "memberCluster", klog.KObj(mc))
if imc == nil {
return
}
// TODO: We didn't handle condition type: clusterv1beta1.ConditionTypeMemberClusterHealthy.
// Copy Agent status.
mc.Status.AgentStatus = imc.Status.AgentStatus
r.aggregateJoinedCondition(mc)
// Copy resource usages.
mc.Status.ResourceUsage = imc.Status.ResourceUsage
// Copy additional conditions.
for idx := range imc.Status.Conditions {
cond := imc.Status.Conditions[idx]
cond.ObservedGeneration = mc.GetGeneration()
meta.SetStatusCondition(&mc.Status.Conditions, cond)
}
// Copy the cluster properties.
mc.Status.Properties = imc.Status.Properties
}
// updateMemberClusterStatus is used to update member cluster status.
func (r *Reconciler) updateMemberClusterStatus(ctx context.Context, mc *clusterv1beta1.MemberCluster) error {
klog.V(2).InfoS("Update the memberCluster status", "memberCluster", klog.KObj(mc))
backOffPeriod := retry.DefaultRetry
backOffPeriod.Cap = time.Second * time.Duration(mc.Spec.HeartbeatPeriodSeconds/2)
return retry.OnError(backOffPeriod,
func(err error) bool {
return apierrors.IsServiceUnavailable(err) || apierrors.IsServerTimeout(err) || apierrors.IsTooManyRequests(err)
},
func() error {
return r.Client.Status().Update(ctx, mc)
})
}
// aggregateJoinedCondition is used to calculate and mark the joined or left status for member cluster based on join conditions from all agents.
func (r *Reconciler) aggregateJoinedCondition(mc *clusterv1beta1.MemberCluster) {
klog.V(2).InfoS("Aggregate joined condition from all agents", "memberCluster", klog.KObj(mc))
if len(mc.Status.AgentStatus) < len(r.agents) {
markMemberClusterUnknown(r.recorder, mc)
return
}
joined := true
left := true
reportedAgents := make(map[clusterv1beta1.AgentType]bool)
for _, agentStatus := range mc.Status.AgentStatus {
if !r.agents[agentStatus.Type] {
_ = controller.NewUnexpectedBehaviorError(fmt.Errorf("find an unexpected agent type %s for member cluster %s", agentStatus.Type, mc.Name))
continue // ignore any unexpected agent type
}
condition := meta.FindStatusCondition(agentStatus.Conditions, string(clusterv1beta1.AgentJoined))
if condition == nil {
markMemberClusterUnknown(r.recorder, mc)
return
}
joined = joined && condition.Status == metav1.ConditionTrue
left = left && condition.Status == metav1.ConditionFalse
reportedAgents[agentStatus.Type] = true
}
if len(reportedAgents) < len(r.agents) {
markMemberClusterUnknown(r.recorder, mc)
return
}
if joined && !left {
markMemberClusterJoined(r.recorder, mc)
} else if !joined && left {
markMemberClusterLeft(r.recorder, mc)
} else {
markMemberClusterUnknown(r.recorder, mc)
}
}
// markMemberClusterReadyToJoin is used to update the ReadyToJoin condition as true of member cluster.
func markMemberClusterReadyToJoin(recorder record.EventRecorder, mc apis.ConditionedObj) {
klog.V(2).InfoS("Mark the member cluster ReadyToJoin", "memberCluster", klog.KObj(mc))
newCondition := metav1.Condition{
Type: string(clusterv1beta1.ConditionTypeMemberClusterReadyToJoin),
Status: metav1.ConditionTrue,
Reason: reasonMemberClusterReadyToJoin,
ObservedGeneration: mc.GetGeneration(),
}
// Joined status changed.
existingCondition := mc.GetCondition(newCondition.Type)
if existingCondition == nil || existingCondition.Status != newCondition.Status {
recorder.Event(mc, corev1.EventTypeNormal, reasonMemberClusterReadyToJoin, "member cluster ready to join")
klog.V(2).InfoS("member cluster ready to join", "memberCluster", klog.KObj(mc))
}
mc.SetConditions(newCondition)
}
// markMemberClusterJoined is used to the update the status of the member cluster to have the joined condition.
func markMemberClusterJoined(recorder record.EventRecorder, mc apis.ConditionedObj) {
klog.V(2).InfoS("Mark the member cluster joined", "memberCluster", klog.KObj(mc))
newCondition := metav1.Condition{
Type: string(clusterv1beta1.ConditionTypeMemberClusterJoined),
Status: metav1.ConditionTrue,
Reason: reasonMemberClusterJoined,
ObservedGeneration: mc.GetGeneration(),
}
// Joined status changed.
existingCondition := mc.GetCondition(newCondition.Type)
if existingCondition == nil || existingCondition.Status != newCondition.Status {
recorder.Event(mc, corev1.EventTypeNormal, reasonMemberClusterJoined, "member cluster joined")
klog.V(2).InfoS("memberCluster joined", "memberCluster", klog.KObj(mc))
metrics.ReportJoinResultMetric()
}
mc.SetConditions(newCondition)
}
// markMemberClusterLeft is used to update the status of the member cluster to have the left condition and mark member cluster as not ready to join.
func markMemberClusterLeft(recorder record.EventRecorder, mc apis.ConditionedObj) {
klog.V(2).InfoS("Mark the member cluster left", "memberCluster", klog.KObj(mc))
newCondition := metav1.Condition{
Type: string(clusterv1beta1.ConditionTypeMemberClusterJoined),
Status: metav1.ConditionFalse,
Reason: reasonMemberClusterLeft,
ObservedGeneration: mc.GetGeneration(),
}
notReadyCondition := metav1.Condition{
Type: string(clusterv1beta1.ConditionTypeMemberClusterReadyToJoin),
Status: metav1.ConditionFalse,
Reason: reasonMemberClusterNotReadyToJoin,
ObservedGeneration: mc.GetGeneration(),
}
// Joined status changed.
existingCondition := mc.GetCondition(newCondition.Type)
if existingCondition == nil || existingCondition.Status != newCondition.Status {
recorder.Event(mc, corev1.EventTypeNormal, reasonMemberClusterJoined, "member cluster left")
klog.V(2).InfoS("memberCluster left", "memberCluster", klog.KObj(mc))
metrics.ReportLeaveResultMetric()
}
mc.SetConditions(newCondition, notReadyCondition)
}
// markMemberClusterUnknown is used to update the status of the member cluster to have the left condition.
func markMemberClusterUnknown(recorder record.EventRecorder, mc apis.ConditionedObj) {
klog.V(2).InfoS("Mark the member cluster join condition unknown", "memberCluster", klog.KObj(mc))
newCondition := metav1.Condition{
Type: string(clusterv1beta1.ConditionTypeMemberClusterJoined),
Status: metav1.ConditionUnknown,
Reason: reasonMemberClusterUnknown,
ObservedGeneration: mc.GetGeneration(),
}
// Joined status changed.
existingCondition := mc.GetCondition(newCondition.Type)
if existingCondition == nil || existingCondition.Status != newCondition.Status {
recorder.Event(mc, corev1.EventTypeWarning, reasonMemberClusterUnknown, "member cluster join state unknown")
klog.V(2).InfoS("memberCluster join state unknown", "memberCluster", klog.KObj(mc))
}
mc.SetConditions(newCondition)
}
// SetupWithManager sets up the controller with the Manager.
func (r *Reconciler) SetupWithManager(mgr runtime.Manager, name string) error {
r.recorder = mgr.GetEventRecorderFor("mcv1beta1")
r.agents = make(map[clusterv1beta1.AgentType]bool)
r.agents[clusterv1beta1.MemberAgent] = true
if r.NetworkingAgentsEnabled {
r.agents[clusterv1beta1.MultiClusterServiceAgent] = true
r.agents[clusterv1beta1.ServiceExportImportAgent] = true
}
return runtime.NewControllerManagedBy(mgr).Named(name).
WithOptions(ctrl.Options{MaxConcurrentReconciles: r.MaxConcurrentReconciles}). // set the max number of concurrent reconciles
For(&clusterv1beta1.MemberCluster{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Owns(&clusterv1beta1.InternalMemberCluster{}).
Complete(r)
}