pkg/controller/apmserver/controller.go (286 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package apmserver import ( "context" "path/filepath" "reflect" "sync/atomic" "go.elastic.co/apm/v2" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" apmv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/apm/v1" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/association" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/certificates" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/defaults" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/driver" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/events" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/finalizer" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/keystore" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/labels" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/operator" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/reconciler" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/tracing" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/version" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/watches" "github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s" ulog "github.com/elastic/cloud-on-k8s/v3/pkg/utils/log" ) const ( controllerName = "apmserver-controller" configHashAnnotationName = "apm.k8s.elastic.co/config-hash" // ApmBaseDir is the base directory of the APM server ApmBaseDir = "/usr/share/apm-server" ) var ( log = ulog.Log.WithName(controllerName) // ApmServerBin is the apm server binary file ApmServerBin = filepath.Join(ApmBaseDir, "apm-server") initContainerParameters = keystore.InitContainerParameters{ KeystoreCreateCommand: ApmServerBin + " keystore create --force", KeystoreAddCommand: ApmServerBin + ` keystore add "$key" --stdin < "$filename"`, SecureSettingsVolumeMountPath: keystore.SecureSettingsVolumeMountPath, KeystoreVolumePath: DataVolumePath, Resources: corev1.ResourceRequirements{ Requests: map[corev1.ResourceName]resource.Quantity{ corev1.ResourceMemory: resource.MustParse("128Mi"), corev1.ResourceCPU: resource.MustParse("100m"), }, Limits: map[corev1.ResourceName]resource.Quantity{ corev1.ResourceMemory: resource.MustParse("128Mi"), corev1.ResourceCPU: resource.MustParse("100m"), }, }, } ) // Add creates a new ApmServer Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller // and Start it when the Manager is Started. func Add(mgr manager.Manager, params operator.Parameters) error { reconciler := newReconciler(mgr, params) c, err := common.NewController(mgr, controllerName, reconciler, params) if err != nil { return err } return addWatches(mgr, c, reconciler) } // newReconciler returns a new reconcile.Reconciler func newReconciler(mgr manager.Manager, params operator.Parameters) *ReconcileApmServer { return &ReconcileApmServer{ Client: mgr.GetClient(), recorder: mgr.GetEventRecorderFor(controllerName), dynamicWatches: watches.NewDynamicWatches(), Parameters: params, } } func addWatches(mgr manager.Manager, c controller.Controller, r *ReconcileApmServer) error { // Watch for changes to ApmServer err := c.Watch(source.Kind(mgr.GetCache(), &apmv1.ApmServer{}, &handler.TypedEnqueueRequestForObject[*apmv1.ApmServer]{})) if err != nil { return err } // Watch Deployments if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.Deployment{}, handler.TypedEnqueueRequestForOwner[*appsv1.Deployment]( mgr.GetScheme(), mgr.GetRESTMapper(), &apmv1.ApmServer{}, handler.OnlyControllerOwner(), ))); err != nil { return err } // Watch Pods, to ensure `status.version` and version upgrades are correctly reconciled on any change. // Watching Deployments only may lead to missing some events. if err := watches.WatchPods(mgr, c, ApmServerNameLabelName); err != nil { return err } // Watch services if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Service{}, handler.TypedEnqueueRequestForOwner[*corev1.Service]( mgr.GetScheme(), mgr.GetRESTMapper(), &apmv1.ApmServer{}, handler.OnlyControllerOwner(), ))); err != nil { return err } // Watch owned and soft-owned secrets if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}, handler.TypedEnqueueRequestForOwner[*corev1.Secret]( mgr.GetScheme(), mgr.GetRESTMapper(), &apmv1.ApmServer{}, handler.OnlyControllerOwner(), ))); err != nil { return err } if err := watches.WatchSoftOwnedSecrets(mgr, c, apmv1.Kind); err != nil { return err } // dynamically watch referenced secrets to connect to Elasticsearch return c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}, r.dynamicWatches.Secrets)) } var _ reconcile.Reconciler = &ReconcileApmServer{} // ReconcileApmServer reconciles an ApmServer object type ReconcileApmServer struct { k8s.Client recorder record.EventRecorder dynamicWatches watches.DynamicWatches operator.Parameters // iteration is the number of times this controller has run its reconcile method iteration uint64 } // K8sClient returns the kubernetes client from the APM Server reconciler. func (r *ReconcileApmServer) K8sClient() k8s.Client { return r.Client } // DynamicWatches returns the set of dynamic watches from the APM Server reconciler. func (r *ReconcileApmServer) DynamicWatches() watches.DynamicWatches { return r.dynamicWatches } // Recorder returns the Kubernetes recorder that is responsible for recording and reporting // events from the APM Server reconciler. func (r *ReconcileApmServer) Recorder() record.EventRecorder { return r.recorder } var _ driver.Interface = &ReconcileApmServer{} // Reconcile reads that state of the cluster for a ApmServer object and makes changes based on the state read // and what is in the ApmServer.Spec func (r *ReconcileApmServer) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { ctx = common.NewReconciliationContext(ctx, &r.iteration, r.Tracer, controllerName, "as_name", request) defer common.LogReconciliationRun(ulog.FromContext(ctx))() defer tracing.EndContextTransaction(ctx) var as apmv1.ApmServer if err := r.Client.Get(ctx, request.NamespacedName, &as); err != nil { if apierrors.IsNotFound(err) { return reconcile.Result{}, r.onDelete(ctx, types.NamespacedName{ Namespace: request.Namespace, Name: request.Name, }) } return reconcile.Result{}, tracing.CaptureError(ctx, err) } if common.IsUnmanaged(ctx, &as) { log.Info("Object currently not managed by this controller. Skipping reconciliation", "namespace", as.Namespace, "as_name", as.Name) return reconcile.Result{}, nil } // Remove any previous finalizer used in ECK v1.0.0-beta1 that we don't need anymore if err := finalizer.RemoveAll(ctx, r.Client, &as); err != nil { return reconcile.Result{}, err } if as.IsMarkedForDeletion() { // APM server will be deleted, clean up resources return reconcile.Result{}, r.onDelete(ctx, k8s.ExtractNamespacedName(&as)) } results, state := r.doReconcile(ctx, &as) return results.WithError(r.updateStatus(ctx, state)).Aggregate() } func (r *ReconcileApmServer) doReconcile(ctx context.Context, as *apmv1.ApmServer) (*reconciler.Results, State) { state := NewState(as) results := reconciler.NewResult(ctx) areAssocsConfigured, err := association.AreConfiguredIfSet(ctx, as.GetAssociations(), r.recorder) if err != nil { return results.WithError(tracing.CaptureError(ctx, err)), state } if !areAssocsConfigured { return results, state } // Run validation in case the webhook is disabled if err := r.validate(ctx, as); err != nil { return results.WithError(err), state } svc, err := common.ReconcileService(ctx, r.Client, NewService(*as), as) if err != nil { return results.WithError(err), state } _, results = certificates.Reconciler{ K8sClient: r.K8sClient(), DynamicWatches: r.DynamicWatches(), Owner: as, TLSOptions: as.Spec.HTTP.TLS, Namer: Namer, Labels: as.GetIdentityLabels(), Services: []corev1.Service{*svc}, GlobalCA: r.GlobalCA, CACertRotation: r.CACertRotation, CertRotation: r.CertRotation, GarbageCollectSecrets: true, }.ReconcileCAAndHTTPCerts(ctx) if results.HasError() { _, err := results.Aggregate() k8s.MaybeEmitErrorEvent(r.recorder, err, as, events.EventReconciliationError, "Certificate reconciliation error: %v", err) return results, state } asVersion, err := version.Parse(as.Spec.Version) if err != nil { return results.WithError(err), state } logger := log.WithValues("namespace", as.Namespace, "as_name", as.Name) assocAllowed, err := association.AllowVersion(asVersion, as, logger, r.recorder) if err != nil { return results.WithError(tracing.CaptureError(ctx, err)), state } if !assocAllowed { return results, state // will eventually retry } state, err = r.reconcileApmServerDeployment(ctx, state, as, asVersion) if err != nil { if apierrors.IsConflict(err) { log.V(1).Info("Conflict while updating status") return results.WithResult(reconcile.Result{Requeue: true}), state } k8s.MaybeEmitErrorEvent(r.recorder, err, as, events.EventReconciliationError, "Deployment reconciliation error: %v", err) return results.WithError(tracing.CaptureError(ctx, err)), state } state.UpdateApmServerExternalService(*svc) _, err = results.WithError(err).Aggregate() k8s.MaybeEmitErrorEvent(r.recorder, err, as, events.EventReconciliationError, "Reconciliation error: %v", err) return results, state } func (r *ReconcileApmServer) validate(ctx context.Context, as *apmv1.ApmServer) error { span, vctx := apm.StartSpan(ctx, "validate", tracing.SpanTypeApp) defer span.End() if _, err := as.ValidateCreate(); err != nil { log.Error(err, "Validation failed") k8s.MaybeEmitErrorEvent(r.recorder, err, as, events.EventReasonValidation, err.Error()) return tracing.CaptureError(vctx, err) } return nil } func (r *ReconcileApmServer) onDelete(ctx context.Context, obj types.NamespacedName) error { // Clean up watches set on secure settings r.dynamicWatches.Secrets.RemoveHandlerForKey(keystore.SecureSettingsWatchName(obj)) // Clean up watches set on custom http tls certificates r.dynamicWatches.Secrets.RemoveHandlerForKey(certificates.CertificateWatchKey(Namer, obj.Name)) return reconciler.GarbageCollectSoftOwnedSecrets(ctx, r.Client, obj, apmv1.Kind) } // reconcileApmServerToken reconciles a Secret containing the APM Server token. // It reuses the existing token if possible. func reconcileApmServerToken(ctx context.Context, c k8s.Client, as *apmv1.ApmServer) (corev1.Secret, error) { expectedApmServerSecret := corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Namespace: as.Namespace, Name: SecretToken(as.Name), Labels: labels.AddCredentialsLabel(as.GetIdentityLabels()), }, Data: make(map[string][]byte), } // reuse the secret token if it already exists var existingSecret corev1.Secret err := c.Get(ctx, k8s.ExtractNamespacedName(&expectedApmServerSecret), &existingSecret) if err != nil && !apierrors.IsNotFound(err) { return corev1.Secret{}, err } if token, exists := existingSecret.Data[SecretTokenKey]; exists { expectedApmServerSecret.Data[SecretTokenKey] = token } else { expectedApmServerSecret.Data[SecretTokenKey] = common.RandomBytes(24) } // Don't set an ownerRef for the APM token secret, likely to be copied into different namespaces. // See https://github.com/elastic/cloud-on-k8s/issues/3986. return reconciler.ReconcileSecretNoOwnerRef(ctx, c, expectedApmServerSecret, as) } func (r *ReconcileApmServer) updateStatus(ctx context.Context, state State) error { span, _ := apm.StartSpan(ctx, "update_status", tracing.SpanTypeApp) defer span.End() original := state.originalApmServer if reflect.DeepEqual(original.Status, state.ApmServer.Status) { return nil } if state.ApmServer.Status.IsDegraded(original.Status.DeploymentStatus) { r.recorder.Event(original, corev1.EventTypeWarning, events.EventReasonUnhealthy, "Apm Server health degraded") } log.V(1).Info("Updating status", "iteration", atomic.LoadUint64(&r.iteration), "namespace", state.ApmServer.Namespace, "as_name", state.ApmServer.Name, "status", state.ApmServer.Status, ) return common.UpdateStatus(ctx, r.Client, state.ApmServer) } // NewService returns the service used by the APM Server. func NewService(as apmv1.ApmServer) *corev1.Service { svc := corev1.Service{ ObjectMeta: as.Spec.HTTP.Service.ObjectMeta, Spec: as.Spec.HTTP.Service.Spec, } svc.ObjectMeta.Namespace = as.Namespace svc.ObjectMeta.Name = HTTPService(as.Name) labels := as.GetIdentityLabels() ports := []corev1.ServicePort{ { Name: as.Spec.HTTP.Protocol(), Protocol: corev1.ProtocolTCP, Port: HTTPPort, }, } return defaults.SetServiceDefaults(&svc, labels, labels, ports) }