controllers/common.go (163 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package controllers
import (
"context"
"errors"
"fmt"
"strings"
"github.com/go-logr/logr"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"github.com/aws/amazon-cloudwatch-agent-operator/apis/v1alpha1"
"github.com/aws/amazon-cloudwatch-agent-operator/internal/manifests"
"github.com/aws/amazon-cloudwatch-agent-operator/internal/manifests/collector"
"github.com/aws/amazon-cloudwatch-agent-operator/internal/manifests/collector/adapters"
"github.com/aws/amazon-cloudwatch-agent-operator/internal/manifests/targetallocator"
)
const (
acceleratedComputeMetrics = "accelerated_compute_metrics"
amazonCloudWatchNamespace = "amazon-cloudwatch"
amazonCloudWatchAgentName = "cloudwatch-agent"
)
func isNamespaceScoped(obj client.Object) bool {
switch obj.(type) {
case *rbacv1.ClusterRole, *rbacv1.ClusterRoleBinding:
return false
default:
return true
}
}
// BuildCollector returns the generation and collected errors of all manifests for a given instance.
func BuildCollector(params manifests.Params) ([]client.Object, error) {
builders := []manifests.Builder{
collector.Build,
targetallocator.Build,
}
var resources []client.Object
for _, builder := range builders {
objs, err := builder(params)
if err != nil {
return nil, err
}
resources = append(resources, objs...)
}
return resources, nil
}
func reconcileDesiredObjectUIDs(ctx context.Context, kubeClient client.Client, logger logr.Logger,
owner metav1.Object, scheme *runtime.Scheme, desiredObjects ...client.Object) (map[types.UID]client.Object, error) {
var errs []error
existingObjectMap := make(map[types.UID]client.Object)
var existingObjectList []client.Object
for _, desired := range desiredObjects {
l := logger.WithValues(
"object_name", desired.GetName(),
"object_kind", desired.GetObjectKind(),
)
if isNamespaceScoped(desired) {
if setErr := ctrl.SetControllerReference(owner, desired, scheme); setErr != nil {
l.Error(setErr, "failed to set controller owner reference to desired")
errs = append(errs, setErr)
continue
}
}
// existing is an object the controller runtime will hydrate for us
// we obtain the existing object by deep copying the desired object because it's the most convenient way
existing := desired.DeepCopyObject().(client.Object)
existingObjectList = append(existingObjectList, existing) //uid are not assigned yet
mutateFn := manifests.MutateFuncFor(existing, desired)
var op controllerutil.OperationResult
crudErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
result, createOrUpdateErr := ctrl.CreateOrUpdate(ctx, kubeClient, existing, mutateFn)
op = result
return createOrUpdateErr
})
if crudErr != nil && errors.Is(crudErr, manifests.ImmutableChangeErr) {
l.Error(crudErr, "detected immutable field change, trying to delete, new object will be created on next reconcile", "existing", existing.GetName())
delErr := kubeClient.Delete(ctx, existing)
if delErr != nil {
return nil, delErr
}
continue
} else if crudErr != nil {
l.Error(crudErr, "failed to configure desired")
errs = append(errs, crudErr)
continue
}
l.V(1).Info(fmt.Sprintf("desired has been %s", op))
}
if len(errs) > 0 {
return nil, fmt.Errorf("failed to create objects for %s: %w", owner.GetName(), errors.Join(errs...))
}
for _, obj := range existingObjectList {
existingObjectMap[obj.GetUID()] = obj
}
return existingObjectMap, nil
}
func reconcileDesiredObjectsWPrune(ctx context.Context, kubeClient client.Client, logger logr.Logger, owner v1alpha1.AmazonCloudWatchAgent, scheme *runtime.Scheme,
desiredObjects []client.Object,
searchOwnedObjectsFunc func(ctx context.Context, owner v1alpha1.AmazonCloudWatchAgent) (map[types.UID]client.Object, error),
) error {
previouslyOwnedObjects, err := searchOwnedObjectsFunc(ctx, owner)
if err != nil {
return fmt.Errorf("failed to search owned objects: %w", err)
}
desiredObjectMap, err := reconcileDesiredObjectUIDs(ctx, kubeClient, logger, &owner, scheme, desiredObjects...)
if err != nil {
return fmt.Errorf("failed to reconcile desired objects: %w", err)
}
// Pruning owned objects in the cluster which are not should not be present after the reconciliation.
err = pruneStaleObjects(ctx, kubeClient, logger, previouslyOwnedObjects, desiredObjectMap)
if err != nil {
return fmt.Errorf("failed to prune objects for %s: %w", owner.GetName(), err)
}
return nil
}
// reconcileDesiredObjects runs the reconcile process using the mutateFn over the given list of objects.
func reconcileDesiredObjects(ctx context.Context, kubeClient client.Client, logger logr.Logger, owner metav1.Object, scheme *runtime.Scheme, desiredObjects ...client.Object) error {
_, err := reconcileDesiredObjectUIDs(ctx, kubeClient, logger, owner, scheme, desiredObjects...)
return err
}
func pruneStaleObjects(ctx context.Context, kubeClient client.Client, logger logr.Logger, previouslyOwnedMap, desiredMap map[types.UID]client.Object) error {
// Pruning owned objects in the cluster which should not be present after the reconciliation.
var pruneErrs []error
for uid, obj := range previouslyOwnedMap {
l := logger.WithValues(
"object_name", obj.GetName(),
"object_kind", obj.GetObjectKind().GroupVersionKind().Kind,
)
if _, found := desiredMap[uid]; found {
continue
}
l.Info("pruning unmanaged resource")
err := kubeClient.Delete(ctx, obj)
if err != nil {
l.Error(err, "failed to delete resource")
pruneErrs = append(pruneErrs, err)
}
}
return errors.Join(pruneErrs...)
}
func enabledAcceleratedComputeByAgentConfig(ctx context.Context, c client.Client, log logr.Logger) bool {
agentResource := getAmazonCloudWatchAgentResource(ctx, c)
// missing feature flag means it's on by default
featureConfigExists := strings.Contains(agentResource.Spec.Config, acceleratedComputeMetrics)
conf, err := adapters.ConfigStructFromJSONString(agentResource.Spec.Config)
if err != nil {
log.Error(err, "Failed to unmarshall agent configuration")
return false
}
if conf.Logs != nil && conf.Logs.LogMetricsCollected != nil && conf.Logs.LogMetricsCollected.Kubernetes != nil {
if conf.Logs.LogMetricsCollected.Kubernetes.EnhancedContainerInsights {
return !featureConfigExists || conf.Logs.LogMetricsCollected.Kubernetes.AcceleratedComputeMetrics
} else {
// enhanced container insights is disabled
return false
}
}
return false
}
var getAmazonCloudWatchAgentResource = func(ctx context.Context, c client.Client) v1alpha1.AmazonCloudWatchAgent {
cr := &v1alpha1.AmazonCloudWatchAgent{}
_ = c.Get(ctx, client.ObjectKey{
Namespace: amazonCloudWatchNamespace,
Name: amazonCloudWatchAgentName,
}, cr)
return *cr
}