controllers/amazoncloudwatchagent_controller.go (146 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 // Package controllers contains the main controller, where the reconciliation starts. package controllers import ( "context" "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/aws/amazon-cloudwatch-agent-operator/apis/v1alpha1" "github.com/aws/amazon-cloudwatch-agent-operator/internal/config" "github.com/aws/amazon-cloudwatch-agent-operator/internal/manifests" "github.com/aws/amazon-cloudwatch-agent-operator/internal/manifests/manifestutils" collectorStatus "github.com/aws/amazon-cloudwatch-agent-operator/internal/status/collector" ) // AmazonCloudWatchAgentReconciler reconciles a AmazonCloudWatchAgent object. type AmazonCloudWatchAgentReconciler struct { client.Client recorder record.EventRecorder scheme *runtime.Scheme log logr.Logger config config.Config } // Params is the set of options to build a new AmazonCloudWatchAgentReconciler. type Params struct { client.Client Recorder record.EventRecorder Scheme *runtime.Scheme Log logr.Logger Config config.Config } func (r *AmazonCloudWatchAgentReconciler) findCloudWatchAgentOwnedObjects(ctx context.Context, owner v1alpha1.AmazonCloudWatchAgent) (map[types.UID]client.Object, error) { // Define a map to store the owned objects ownedObjects := make(map[types.UID]client.Object) selector := manifestutils.SelectorLabelsForAllOperatorManaged(owner.ObjectMeta) listOps := &client.ListOptions{ Namespace: owner.Namespace, LabelSelector: labels.SelectorFromSet(selector), } // Define lists for different Kubernetes resources configMapList := &corev1.ConfigMapList{} serviceList := &corev1.ServiceList{} serviceAccountList := &corev1.ServiceAccountList{} deploymentList := &appsv1.DeploymentList{} statefulSetList := &appsv1.StatefulSetList{} daemonSetList := &appsv1.DaemonSetList{} var err error // List ConfigMaps err = r.List(ctx, configMapList, listOps) if err != nil { return nil, err } for i := range configMapList.Items { ownedObjects[configMapList.Items[i].GetUID()] = &configMapList.Items[i] } // List Services err = r.List(ctx, serviceList, listOps) if err != nil { return nil, err } for i := range serviceList.Items { ownedObjects[serviceList.Items[i].GetUID()] = &serviceList.Items[i] } // List ServiceAccounts err = r.List(ctx, serviceAccountList, listOps) if err != nil { return nil, err } for i := range serviceAccountList.Items { ownedObjects[serviceAccountList.Items[i].GetUID()] = &serviceAccountList.Items[i] } // List Deployments err = r.List(ctx, deploymentList, listOps) if err != nil { return nil, err } for i := range deploymentList.Items { ownedObjects[deploymentList.Items[i].GetUID()] = &deploymentList.Items[i] } // List StatefulSets err = r.List(ctx, statefulSetList, listOps) if err != nil { return nil, err } for i := range statefulSetList.Items { ownedObjects[statefulSetList.Items[i].GetUID()] = &statefulSetList.Items[i] } // List DaemonSets err = r.List(ctx, daemonSetList, listOps) if err != nil { return nil, err } for i := range daemonSetList.Items { ownedObjects[daemonSetList.Items[i].GetUID()] = &daemonSetList.Items[i] } return ownedObjects, nil } func (r *AmazonCloudWatchAgentReconciler) getParams(instance v1alpha1.AmazonCloudWatchAgent) manifests.Params { return manifests.Params{ Config: r.config, Client: r.Client, OtelCol: instance, Log: r.log, Scheme: r.scheme, Recorder: r.recorder, } } // NewReconciler creates a new reconciler for AmazonCloudWatchAgent objects. func NewReconciler(p Params) *AmazonCloudWatchAgentReconciler { r := &AmazonCloudWatchAgentReconciler{ Client: p.Client, log: p.Log, scheme: p.Scheme, config: p.Config, recorder: p.Recorder, } return r } // +kubebuilder:rbac:groups="",resources=pods;configmaps;services;serviceaccounts,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups="",resources=events,verbs=create;patch // +kubebuilder:rbac:groups=apps,resources=daemonsets;deployments;statefulsets,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=autoscaling,resources=horizontalpodautoscalers,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=policy,resources=poddisruptionbudgets,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,verbs=get;list;create;update // +kubebuilder:rbac:groups=monitoring.coreos.com,resources=servicemonitors;podmonitors,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=networking.k8s.io,resources=ingresses,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=route.openshift.io,resources=routes;routes/custom-host,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=cloudwatch.aws.amazon.com,resources=amazoncloudwatchagents,verbs=get;list;watch;update;patch // +kubebuilder:rbac:groups=cloudwatch.aws.amazon.com,resources=amazoncloudwatchagents/status,verbs=get;update;patch // +kubebuilder:rbac:groups=cloudwatch.aws.amazon.com,resources=amazoncloudwatchagents/finalizers,verbs=get;update;patch // Reconcile the current state of an OpenTelemetry collector resource with the desired state. func (r *AmazonCloudWatchAgentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := r.log.WithValues("amazoncloudwatchagent", req.NamespacedName) var instance v1alpha1.AmazonCloudWatchAgent if err := r.Get(ctx, req.NamespacedName, &instance); err != nil { if !apierrors.IsNotFound(err) { log.Error(err, "unable to fetch AmazonCloudWatchAgent") } // we'll ignore not-found errors, since they can't be fixed by an immediate // requeue (we'll need to wait for a new notification), and we can get them // on deleted requests. return ctrl.Result{}, client.IgnoreNotFound(err) } // We have a deletion, short circuit and let the deletion happen if deletionTimestamp := instance.GetDeletionTimestamp(); deletionTimestamp != nil { return ctrl.Result{}, nil } if instance.Spec.ManagementState == v1alpha1.ManagementStateUnmanaged { log.Info("Skipping reconciliation for unmanaged AmazonCloudWatchAgent resource", "name", req.String()) // Stop requeueing for unmanaged AmazonCloudWatchAgent custom resources return ctrl.Result{}, nil } params := r.getParams(instance) desiredObjects, buildErr := BuildCollector(params) if buildErr != nil { return ctrl.Result{}, buildErr } err := reconcileDesiredObjectsWPrune(ctx, r.Client, log, params.OtelCol, params.Scheme, desiredObjects, r.findCloudWatchAgentOwnedObjects) return collectorStatus.HandleReconcileStatus(ctx, log, params, err) } // SetupWithManager tells the manager what our controller is interested in. func (r *AmazonCloudWatchAgentReconciler) SetupWithManager(mgr ctrl.Manager) error { builder := ctrl.NewControllerManagedBy(mgr). For(&v1alpha1.AmazonCloudWatchAgent{}). Owns(&corev1.ConfigMap{}). Owns(&corev1.ServiceAccount{}). Owns(&corev1.Service{}). Owns(&appsv1.Deployment{}). Owns(&appsv1.DaemonSet{}). Owns(&appsv1.StatefulSet{}) return builder.Complete(r) }