pkg/controllers/workgenerator/override.go (211 lines of code) (raw):
/*
Copyright 2025 The KubeFleet Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workgenerator
import (
"context"
"encoding/json"
"fmt"
"strings"
jsonpatch "github.com/evanphx/json-patch/v5"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1"
placementv1alpha1 "go.goms.io/fleet/apis/placement/v1alpha1"
placementv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
"go.goms.io/fleet/pkg/utils"
"go.goms.io/fleet/pkg/utils/controller"
"go.goms.io/fleet/pkg/utils/overrider"
)
func (r *Reconciler) fetchClusterResourceOverrideSnapshots(ctx context.Context, resourceBinding *placementv1beta1.ClusterResourceBinding) (map[placementv1beta1.ResourceIdentifier][]*placementv1alpha1.ClusterResourceOverrideSnapshot, error) {
croMap := make(map[placementv1beta1.ResourceIdentifier][]*placementv1alpha1.ClusterResourceOverrideSnapshot)
// For now, we get the snapshots sequentially. We can optimize this by getting them in parallel, but we need to reorder
// the snapshot lists saved in the map.
for _, name := range resourceBinding.Spec.ClusterResourceOverrideSnapshots {
snapshot := &placementv1alpha1.ClusterResourceOverrideSnapshot{}
if err := r.Client.Get(ctx, types.NamespacedName{Name: name}, snapshot); err != nil {
if errors.IsNotFound(err) {
klog.ErrorS(err, "The clusterResourceOverrideSnapshot is deleted", "resourceBinding", klog.KObj(resourceBinding), "clusterResourceOverrideSnapshot", name)
// It could be caused by that the user updates the override too frequently and the snapshot has been replaced
// by the new one.
// TODO: support customized revision history limit
return nil, controller.NewUserError(fmt.Errorf("clusterResourceSnapshot %s is not found", name))
}
klog.ErrorS(err, "Failed to get the clusterResourceOverrideSnapshot",
"resourceBinding", klog.KObj(resourceBinding), "clusterResourceOverrideSnapshot", name)
return nil, controller.NewAPIServerError(true, err)
}
for _, selector := range snapshot.Spec.OverrideSpec.ClusterResourceSelectors {
// Note, we only support name selector here.
key := placementv1beta1.ResourceIdentifier{
Group: selector.Group,
Version: selector.Version,
Kind: selector.Kind,
Name: selector.Name,
}
croMap[key] = append(croMap[key], snapshot)
}
}
klog.V(2).InfoS("Fetched clusterResourceOverrideSnapshots", "resourceBinding", klog.KObj(resourceBinding), "numberOfResources", len(croMap))
return croMap, nil
}
func (r *Reconciler) fetchResourceOverrideSnapshots(ctx context.Context, resourceBinding *placementv1beta1.ClusterResourceBinding) (map[placementv1beta1.ResourceIdentifier][]*placementv1alpha1.ResourceOverrideSnapshot, error) {
roMap := make(map[placementv1beta1.ResourceIdentifier][]*placementv1alpha1.ResourceOverrideSnapshot)
// For now, we get the snapshots sequentially. We can optimize this by getting them in parallel, but we need to reorder
// the snapshot lists saved in the map.
for _, namespacedName := range resourceBinding.Spec.ResourceOverrideSnapshots {
snapshot := &placementv1alpha1.ResourceOverrideSnapshot{}
if err := r.Client.Get(ctx, types.NamespacedName{Name: namespacedName.Name, Namespace: namespacedName.Namespace}, snapshot); err != nil {
if errors.IsNotFound(err) {
// It could be caused by that the user updates the override too frequently and the snapshot has been replaced
// by the new one.
// TODO: support customized revision history limit
klog.ErrorS(err, "The resourceOverrideSnapshot is deleted", "resourceBinding", klog.KObj(resourceBinding), "resourceOverrideSnapshot", namespacedName)
return nil, controller.NewUserError(fmt.Errorf("resourceSnapshot %s is not found", namespacedName))
}
klog.ErrorS(err, "Failed to get the resourceOverrideSnapshot",
"resourceBinding", klog.KObj(resourceBinding), "resourceOverrideSnapshot", namespacedName)
return nil, controller.NewAPIServerError(true, err)
}
for _, selector := range snapshot.Spec.OverrideSpec.ResourceSelectors {
key := placementv1beta1.ResourceIdentifier{
Group: selector.Group,
Version: selector.Version,
Kind: selector.Kind,
Name: selector.Name,
Namespace: snapshot.Namespace,
}
roMap[key] = append(roMap[key], snapshot)
}
}
klog.V(2).InfoS("Fetched resourceOverrideSnapshots", "resourceBinding", klog.KObj(resourceBinding), "numberOfResources", len(roMap))
return roMap, nil
}
// applyOverrides applies the overrides on the selected resources.
// The resource could be selected by both ClusterResourceOverride and ResourceOverride.
// It returns
// - true if the resource is deleted by the overrides.
// - an error if the override rules are invalid.
func (r *Reconciler) applyOverrides(resource *placementv1beta1.ResourceContent, cluster *clusterv1beta1.MemberCluster,
croMap map[placementv1beta1.ResourceIdentifier][]*placementv1alpha1.ClusterResourceOverrideSnapshot, roMap map[placementv1beta1.ResourceIdentifier][]*placementv1alpha1.ResourceOverrideSnapshot) (bool, error) {
if len(croMap) == 0 && len(roMap) == 0 {
return false, nil
}
var uResource unstructured.Unstructured
if err := uResource.UnmarshalJSON(resource.Raw); err != nil {
klog.ErrorS(err, "Work has invalid content", "selectedResource", resource.Raw)
return false, controller.NewUnexpectedBehaviorError(err)
}
gvk := uResource.GetObjectKind().GroupVersionKind()
key := placementv1beta1.ResourceIdentifier{
Group: gvk.Group,
Version: gvk.Version,
Kind: gvk.Kind,
Name: uResource.GetName(),
}
isClusterScopeResource := r.InformerManager.IsClusterScopedResources(gvk)
// For the namespace scoped resource, it could be selected by the namespace itself.
// use the namespace as the key
if !isClusterScopeResource {
key = placementv1beta1.ResourceIdentifier{
Group: utils.NamespaceMetaGVK.Group,
Version: utils.NamespaceMetaGVK.Version,
Kind: utils.NamespaceMetaGVK.Kind,
Name: uResource.GetNamespace(),
}
}
// Apply ClusterResourceOverrideSnapshots.
for _, snapshot := range croMap[key] {
if snapshot.Spec.OverrideSpec.Policy == nil {
err := fmt.Errorf("invalid clusterResourceOverrideSnapshot %s: policy is nil", snapshot.Name)
klog.ErrorS(controller.NewUnexpectedBehaviorError(err), "Found an invalid clusterResourceOverrideSnapshot", "clusterResourceOverrideSnapshot", klog.KObj(snapshot))
continue // should not happen
}
if err := applyOverrideRules(resource, cluster, snapshot.Spec.OverrideSpec.Policy.OverrideRules); err != nil {
klog.ErrorS(err, "Failed to apply the override rules", "clusterResourceOverrideSnapshot", klog.KObj(snapshot))
return false, err
}
}
klog.V(2).InfoS("Applied clusterResourceOverrideSnapshots", "resource", klog.KObj(&uResource), "numberOfOverrides", len(croMap[key]))
// If the resource is selected by both ClusterResourceOverride and ResourceOverride, ResourceOverride will win when resolving conflicts.
// Apply ResourceOverrideSnapshots.
if !isClusterScopeResource {
key = placementv1beta1.ResourceIdentifier{
Group: gvk.Group,
Version: gvk.Version,
Kind: gvk.Kind,
Name: uResource.GetName(),
Namespace: uResource.GetNamespace(),
}
for _, snapshot := range roMap[key] {
if snapshot.Spec.OverrideSpec.Policy == nil {
err := fmt.Errorf("invalid resourceOverrideSnapshot %s: policy is nil", snapshot.Name)
klog.ErrorS(controller.NewUnexpectedBehaviorError(err), "Found an invalid resourceOverrideSnapshot", "resourceOverrideSnapshot", klog.KObj(snapshot))
continue // should not happen
}
if err := applyOverrideRules(resource, cluster, snapshot.Spec.OverrideSpec.Policy.OverrideRules); err != nil {
klog.ErrorS(err, "Failed to apply the override rules", "resourceOverrideSnapshot", klog.KObj(snapshot))
return false, err
}
}
klog.V(2).InfoS("Applied resourceOverrideSnapshots", "resource", klog.KObj(&uResource), "numberOfOverrides", len(roMap[key]))
}
return resource.Raw == nil, nil
}
func applyOverrideRules(resource *placementv1beta1.ResourceContent, cluster *clusterv1beta1.MemberCluster, rules []placementv1alpha1.OverrideRule) error {
for _, rule := range rules {
matched, err := overrider.IsClusterMatched(cluster, rule)
if err != nil {
klog.ErrorS(controller.NewUnexpectedBehaviorError(err), "Found an invalid override rule")
return controller.NewUserError(err) // should not happen though and should be rejected by the webhook
}
if !matched {
continue
}
if rule.OverrideType == placementv1alpha1.DeleteOverrideType {
// Delete the resource
resource.Raw = nil
return nil
}
// Apply JSONPatchOverrides by default
if err = applyJSONPatchOverride(resource, cluster, rule.JSONPatchOverrides); err != nil {
klog.ErrorS(err, "Failed to apply JSON patch override")
return controller.NewUserError(err)
}
}
return nil
}
// applyJSONPatchOverride applies a JSON patch on the selected resources following [RFC 6902](https://datatracker.ietf.org/doc/html/rfc6902).
func applyJSONPatchOverride(resourceContent *placementv1beta1.ResourceContent, cluster *clusterv1beta1.MemberCluster, overrides []placementv1alpha1.JSONPatchOverride) error {
var err error
if len(overrides) == 0 { // do nothing
return nil
}
// go through the JSON patch overrides to replace the built-in variables before json Marshal
// as it may contain the built-in variables that cannot be marshaled directly
for i := range overrides {
// Process the JSON string to replace variables
jsonStr := string(overrides[i].Value.Raw)
// Replace the built-in ${MEMBER-CLUSTER-NAME} variable with the actual cluster name
jsonStr = strings.ReplaceAll(jsonStr, placementv1alpha1.OverrideClusterNameVariable, cluster.Name)
// Replace label key variables with actual label values
jsonStr, err = replaceClusterLabelKeyVariables(jsonStr, cluster)
if err != nil {
klog.ErrorS(err, "Failed to replace cluster label key variables in JSON patch override")
return err
}
overrides[i].Value.Raw = []byte(jsonStr)
}
jsonPatchBytes, err := json.Marshal(overrides)
if err != nil {
klog.ErrorS(err, "Failed to marshal JSON Patch overrides")
return err
}
patch, err := jsonpatch.DecodePatch(jsonPatchBytes)
if err != nil {
klog.ErrorS(err, "Failed to decode the passed JSON document as an RFC 6902 patch")
return err
}
patchedObjectJSONBytes, err := patch.Apply(resourceContent.Raw)
if err != nil {
klog.ErrorS(err, "Failed to apply the JSON patch to the resource")
return err
}
resourceContent.Raw = patchedObjectJSONBytes
return nil
}
// replaceClusterLabelKeyVariables finds all occurrences of the OverrideClusterLabelKeyVariablePrefix pattern
// (e.g. ${MEMBER-CLUSTER-LABEL-KEY-region}) in the input string and replaces them with
// the corresponding label values from the cluster.
// If a label with the specified key doesn't exist, it returns an error.
func replaceClusterLabelKeyVariables(input string, cluster *clusterv1beta1.MemberCluster) (string, error) {
prefixLen := len(placementv1alpha1.OverrideClusterLabelKeyVariablePrefix)
result := input
for {
startIdx := strings.Index(result, placementv1alpha1.OverrideClusterLabelKeyVariablePrefix)
if startIdx == -1 {
break
}
// extract the key value user wants to replace
endIdx := strings.Index(result[startIdx+prefixLen:], "}")
if endIdx == -1 {
klog.V(2).InfoS("malformed key ${MEMBER-CLUSTER-LABEL-KEY without the closing `}`", "input", input)
return "", fmt.Errorf("input %s is missing the closing bracket `}`", input)
}
endIdx += startIdx + prefixLen
// extract the key name
keyName := result[startIdx+prefixLen : endIdx]
// check if the key exists in the cluster labels
labelValue, exists := cluster.ObjectMeta.Labels[keyName]
if !exists {
klog.V(2).InfoS("Label key not found on cluster", "key", keyName, "cluster", cluster.Name)
return "", fmt.Errorf("label key %s not found on cluster %s", keyName, cluster.Name)
}
// replace this instance of the variable with the actual label value
fullVariable := result[startIdx : endIdx+1]
result = strings.Replace(result, fullVariable, labelValue, 1)
}
return result, nil
}