tools/draincluster/drain/drain.go (163 lines of code) (raw):
/*
Copyright (c) Microsoft Corporation.
Licensed under the MIT license.
*/
package drain
import (
"context"
"fmt"
"log"
k8errors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1"
placementv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
"go.goms.io/fleet/pkg/utils/condition"
evictionutils "go.goms.io/fleet/pkg/utils/eviction"
toolsutils "go.goms.io/fleet/tools/utils"
)
const (
uuidLength = 8
drainEvictionNameFormat = "drain-eviction-%s-%s-%s"
resourceIdentifierKeyFormat = "%s/%s/%s/%s/%s"
)
type Helper struct {
HubClient client.Client
ClusterName string
}
func (h *Helper) Drain(ctx context.Context) (bool, error) {
if err := h.cordon(ctx); err != nil {
return false, fmt.Errorf("failed to cordon member cluster %s: %w", h.ClusterName, err)
}
log.Printf("Successfully cordoned member cluster %s by adding cordon taint", h.ClusterName)
crpNameMap, err := h.fetchClusterResourcePlacementNamesToEvict(ctx)
if err != nil {
return false, err
}
if len(crpNameMap) == 0 {
log.Printf("There are currently no resources propagated to %s from fleet using ClusterResourcePlacement resources", h.ClusterName)
return true, nil
}
isDrainSuccessful := true
// create eviction objects for all <crpName, targetCluster>.
for crpName := range crpNameMap {
evictionName, err := generateDrainEvictionName(crpName, h.ClusterName)
if err != nil {
return false, err
}
err = retry.OnError(retry.DefaultBackoff, func(err error) bool {
return k8errors.IsAlreadyExists(err)
}, func() error {
eviction := placementv1beta1.ClusterResourcePlacementEviction{
ObjectMeta: metav1.ObjectMeta{
Name: evictionName,
},
Spec: placementv1beta1.PlacementEvictionSpec{
PlacementName: crpName,
ClusterName: h.ClusterName,
},
}
return h.HubClient.Create(ctx, &eviction)
})
if err != nil {
return false, fmt.Errorf("failed to create eviction for CRP %s: %w", crpName, err)
}
// wait until evictions reach a terminal state.
var eviction placementv1beta1.ClusterResourcePlacementEviction
err = wait.ExponentialBackoffWithContext(ctx, retry.DefaultBackoff, func(ctx context.Context) (bool, error) {
if err := h.HubClient.Get(ctx, types.NamespacedName{Name: evictionName}, &eviction); err != nil {
return false, fmt.Errorf("failed to get eviction %s: %w", evictionName, err)
}
return evictionutils.IsEvictionInTerminalState(&eviction), nil
})
if err != nil {
return false, fmt.Errorf("failed to wait for evictions to reach terminal state: %w", err)
}
// TODO: add safeguards to check if eviction conditions are set to unknown.
validCondition := eviction.GetCondition(string(placementv1beta1.PlacementEvictionConditionTypeValid))
if validCondition != nil && validCondition.Status == metav1.ConditionFalse {
// check to see if CRP is missing or CRP is being deleted or CRB is missing.
if validCondition.Reason == condition.EvictionInvalidMissingCRPMessage ||
validCondition.Reason == condition.EvictionInvalidDeletingCRPMessage ||
validCondition.Reason == condition.EvictionInvalidMissingCRBMessage {
log.Printf("eviction %s is invalid with reason %s for CRP %s, but drain will succeed", evictionName, validCondition.Reason, crpName)
continue
}
}
executedCondition := eviction.GetCondition(string(placementv1beta1.PlacementEvictionConditionTypeExecuted))
if executedCondition == nil || executedCondition.Status == metav1.ConditionFalse {
isDrainSuccessful = false
log.Printf("eviction %s was not executed successfully for CRP %s", evictionName, crpName)
continue
}
log.Printf("eviction %s was executed successfully for CRP %s", evictionName, crpName)
// log each cluster scoped resource evicted for CRP.
clusterScopedResourceIdentifiers, err := h.collectClusterScopedResourcesSelectedByCRP(ctx, crpName)
if err != nil {
log.Printf("failed to collect cluster scoped resources selected by CRP %s: %v", crpName, err)
continue
}
for _, resourceIdentifier := range clusterScopedResourceIdentifiers {
log.Printf("evicted resource %s propagated by CRP %s", generateResourceIdentifierKey(resourceIdentifier), crpName)
}
}
return isDrainSuccessful, nil
}
func (h *Helper) cordon(ctx context.Context) error {
// add taint to member cluster to ensure resources aren't scheduled on it.
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
var mc clusterv1beta1.MemberCluster
if err := h.HubClient.Get(ctx, types.NamespacedName{Name: h.ClusterName}, &mc); err != nil {
return err
}
// search to see cordonTaint already exists on the cluster.
for i := range mc.Spec.Taints {
if mc.Spec.Taints[i] == toolsutils.CordonTaint {
return nil
}
}
// add taint to member cluster to cordon.
mc.Spec.Taints = append(mc.Spec.Taints, toolsutils.CordonTaint)
return h.HubClient.Update(ctx, &mc)
})
}
func (h *Helper) fetchClusterResourcePlacementNamesToEvict(ctx context.Context) (map[string]bool, error) {
var crbList placementv1beta1.ClusterResourceBindingList
if err := h.HubClient.List(ctx, &crbList); err != nil {
return map[string]bool{}, fmt.Errorf("failed to list cluster resource bindings: %w", err)
}
crpNameMap := make(map[string]bool)
// find all unique CRP names for which eviction needs to occur.
for i := range crbList.Items {
crb := crbList.Items[i]
if crb.Spec.TargetCluster == h.ClusterName && crb.DeletionTimestamp == nil {
crpName, ok := crb.GetLabels()[placementv1beta1.CRPTrackingLabel]
if !ok {
return map[string]bool{}, fmt.Errorf("failed to get CRP name from binding %s", crb.Name)
}
crpNameMap[crpName] = true
}
}
return crpNameMap, nil
}
func (h *Helper) collectClusterScopedResourcesSelectedByCRP(ctx context.Context, crpName string) ([]placementv1beta1.ResourceIdentifier, error) {
var crp placementv1beta1.ClusterResourcePlacement
if err := h.HubClient.Get(ctx, types.NamespacedName{Name: crpName}, &crp); err != nil {
return nil, fmt.Errorf("failed to get ClusterResourcePlacement %s: %w", crpName, err)
}
var resourcesPropagated []placementv1beta1.ResourceIdentifier
for _, selectedResource := range crp.Status.SelectedResources {
// only collect cluster scoped resources.
if len(selectedResource.Namespace) == 0 {
resourcesPropagated = append(resourcesPropagated, selectedResource)
}
}
return resourcesPropagated, nil
}
func generateDrainEvictionName(crpName, targetCluster string) (string, error) {
evictionName := fmt.Sprintf(drainEvictionNameFormat, crpName, targetCluster, uuid.NewUUID()[:uuidLength])
if errs := validation.IsQualifiedName(evictionName); len(errs) != 0 {
return "", fmt.Errorf("failed to format a qualified name for drain eviction object with CRP name %s, cluster name %s: %v", crpName, targetCluster, errs)
}
return evictionName, nil
}
func generateResourceIdentifierKey(r placementv1beta1.ResourceIdentifier) string {
if len(r.Group) == 0 && len(r.Namespace) == 0 {
return fmt.Sprintf(resourceIdentifierKeyFormat, "''", r.Version, r.Kind, "''", r.Name)
}
if len(r.Namespace) == 0 {
return fmt.Sprintf(resourceIdentifierKeyFormat, r.Group, r.Version, r.Kind, "''", r.Name)
}
return fmt.Sprintf(resourceIdentifierKeyFormat, r.Group, r.Version, r.Kind, r.Namespace, r.Name)
}