pkg/controller/maps/controller.go (331 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package maps
import (
"context"
"fmt"
"hash/fnv"
"reflect"
"sync/atomic"
"time"
"go.elastic.co/apm/v2"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
emsv1alpha1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/maps/v1alpha1"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/association"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common"
commonassociation "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/association"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/certificates"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/defaults"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/deployment"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/driver"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/events"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/license"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/operator"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/reconciler"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/tracing"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/version"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/watches"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s"
ulog "github.com/elastic/cloud-on-k8s/v3/pkg/utils/log"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/maps"
)
const (
controllerName = "maps-controller"
)
// Add creates a new MapsServer Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller
// and Start it when the Manager is Started.
func Add(mgr manager.Manager, params operator.Parameters) error {
reconciler := newReconciler(mgr, params)
c, err := common.NewController(mgr, controllerName, reconciler, params)
if err != nil {
return err
}
return addWatches(mgr, c, reconciler)
}
// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager, params operator.Parameters) *ReconcileMapsServer {
client := mgr.GetClient()
return &ReconcileMapsServer{
Client: client,
recorder: mgr.GetEventRecorderFor(controllerName),
dynamicWatches: watches.NewDynamicWatches(),
licenseChecker: license.NewLicenseChecker(client, params.OperatorNamespace),
Parameters: params,
}
}
func addWatches(mgr manager.Manager, c controller.Controller, r *ReconcileMapsServer) error {
// Watch for changes to MapsServer
if err := c.Watch(source.Kind(mgr.GetCache(), &emsv1alpha1.ElasticMapsServer{}, &handler.TypedEnqueueRequestForObject[*emsv1alpha1.ElasticMapsServer]{})); err != nil {
return err
}
// Watch deployments
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.Deployment{}, handler.TypedEnqueueRequestForOwner[*appsv1.Deployment](
mgr.GetScheme(), mgr.GetRESTMapper(),
&emsv1alpha1.ElasticMapsServer{}, handler.OnlyControllerOwner(),
))); err != nil {
return err
}
// Watch Pods, to ensure `status.version` and version upgrades are correctly reconciled on any change.
// Watching Deployments only may lead to missing some events.
if err := watches.WatchPods(mgr, c, NameLabelName); err != nil {
return err
}
// Watch services
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Service{}, handler.TypedEnqueueRequestForOwner[*corev1.Service](
mgr.GetScheme(), mgr.GetRESTMapper(),
&emsv1alpha1.ElasticMapsServer{}, handler.OnlyControllerOwner(),
))); err != nil {
return err
}
// Watch owned and soft-owned secrets
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}, handler.TypedEnqueueRequestForOwner[*corev1.Secret](
mgr.GetScheme(), mgr.GetRESTMapper(),
&emsv1alpha1.ElasticMapsServer{}, handler.OnlyControllerOwner(),
))); err != nil {
return err
}
if err := watches.WatchSoftOwnedSecrets(mgr, c, emsv1alpha1.Kind); err != nil {
return err
}
// Dynamically watch referenced secrets to connect to Elasticsearch
return c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}, r.dynamicWatches.Secrets))
}
var _ reconcile.Reconciler = &ReconcileMapsServer{}
// ReconcileMapsServer reconciles a MapsServer object
type ReconcileMapsServer struct {
k8s.Client
operator.Parameters
recorder record.EventRecorder
dynamicWatches watches.DynamicWatches
licenseChecker license.Checker
// iteration is the number of times this controller has run its Reconcile method
iteration uint64
}
func (r *ReconcileMapsServer) K8sClient() k8s.Client {
return r.Client
}
func (r *ReconcileMapsServer) DynamicWatches() watches.DynamicWatches {
return r.dynamicWatches
}
func (r *ReconcileMapsServer) Recorder() record.EventRecorder {
return r.recorder
}
var _ driver.Interface = &ReconcileMapsServer{}
// Reconcile reads that state of the cluster for a MapsServer object and makes changes based on the state read and what is
// in the MapsServer.Spec
func (r *ReconcileMapsServer) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
ctx = common.NewReconciliationContext(ctx, &r.iteration, r.Tracer, controllerName, "maps_name", request)
defer common.LogReconciliationRun(ulog.FromContext(ctx))()
defer tracing.EndContextTransaction(ctx)
// retrieve the EMS object
var ems emsv1alpha1.ElasticMapsServer
if err := r.Client.Get(ctx, request.NamespacedName, &ems); err != nil {
if apierrors.IsNotFound(err) {
return reconcile.Result{}, r.onDelete(ctx,
types.NamespacedName{
Namespace: request.Namespace,
Name: request.Name,
})
}
return reconcile.Result{}, tracing.CaptureError(ctx, err)
}
if common.IsUnmanaged(ctx, &ems) {
ulog.FromContext(ctx).Info("Object is currently not managed by this controller. Skipping reconciliation", "namespace", ems.Namespace, "maps_name", ems.Name)
return reconcile.Result{}, nil
}
// MapsServer will be deleted nothing to do other than remove the watches
if ems.IsMarkedForDeletion() {
return reconcile.Result{}, r.onDelete(ctx, k8s.ExtractNamespacedName(&ems))
}
// main reconciliation logic
results, status := r.doReconcile(ctx, ems)
if err := r.updateStatus(ctx, ems, status); err != nil {
if apierrors.IsConflict(err) {
return results.WithResult(reconcile.Result{Requeue: true}).Aggregate()
}
results.WithError(err)
}
return results.Aggregate()
}
func (r *ReconcileMapsServer) doReconcile(ctx context.Context, ems emsv1alpha1.ElasticMapsServer) (*reconciler.Results, emsv1alpha1.MapsStatus) {
log := ulog.FromContext(ctx)
results := reconciler.NewResult(ctx)
status := newStatus(ems)
enabled, err := r.licenseChecker.EnterpriseFeaturesEnabled(ctx)
if err != nil {
return results.WithError(err), status
}
if !enabled {
msg := "Elastic Maps Server is an enterprise feature. Enterprise features are disabled"
log.Info(msg, "namespace", ems.Namespace, "maps_name", ems.Name)
r.recorder.Eventf(&ems, corev1.EventTypeWarning, events.EventReconciliationError, msg)
// we don't have a good way of watching for the license level to change so just requeue with a reasonably long delay
return results.WithResult(reconcile.Result{Requeue: true, RequeueAfter: 5 * time.Minute}), status
}
isEsAssocConfigured, err := association.IsConfiguredIfSet(ctx, &ems, r.recorder)
if err != nil {
return results.WithError(err), status
}
if !isEsAssocConfigured {
return results, status
}
// Run validation in case the webhook is disabled
if err := r.validate(ctx, ems); err != nil {
return results.WithError(err), status
}
svc, err := common.ReconcileService(ctx, r.Client, NewService(ems), &ems)
if err != nil {
return results.WithError(err), status
}
_, results = certificates.Reconciler{
K8sClient: r.K8sClient(),
DynamicWatches: r.DynamicWatches(),
Owner: &ems,
TLSOptions: ems.Spec.HTTP.TLS,
Namer: EMSNamer,
Labels: ems.GetIdentityLabels(),
Services: []corev1.Service{*svc},
GlobalCA: r.GlobalCA,
CACertRotation: r.CACertRotation,
CertRotation: r.CertRotation,
GarbageCollectSecrets: true,
}.ReconcileCAAndHTTPCerts(ctx)
if results.HasError() {
_, err := results.Aggregate()
k8s.MaybeEmitErrorEvent(r.recorder, err, &ems, events.EventReconciliationError, "Certificate reconciliation error: %v", err)
return results, status
}
emsVersion, err := version.Parse(ems.Spec.Version)
if err != nil {
return results.WithError(err), status
}
assocAllowed, err := association.AllowVersion(emsVersion, ems.Associated(), log, r.recorder)
if err != nil {
return results.WithError(err), status
}
if !assocAllowed {
// will eventually retry once updated, along with the results
// from the certificate reconciliation having a retry after a time period
return results, status
}
configSecret, err := reconcileConfig(ctx, r, ems, r.IPFamily)
if err != nil {
return results.WithError(err), status
}
// build a hash of various inputs to rotate Pods on any change
configHash, err := buildConfigHash(r.K8sClient(), ems, configSecret)
if err != nil {
return results.WithError(fmt.Errorf("build config hash: %w", err)), status
}
deploy, err := r.reconcileDeployment(ctx, ems, configHash)
if err != nil {
return results.WithError(fmt.Errorf("reconcile deployment: %w", err)), status
}
status, err = r.getStatus(ctx, ems, deploy)
if err != nil {
return results.WithError(fmt.Errorf("calculating status: %w", err)), status
}
return results, status
}
func newStatus(ems emsv1alpha1.ElasticMapsServer) emsv1alpha1.MapsStatus {
status := ems.Status
status.ObservedGeneration = ems.Generation
return status
}
func (r *ReconcileMapsServer) validate(ctx context.Context, ems emsv1alpha1.ElasticMapsServer) error {
span, vctx := apm.StartSpan(ctx, "validate", tracing.SpanTypeApp)
defer span.End()
if _, err := ems.ValidateCreate(); err != nil {
ulog.FromContext(ctx).Error(err, "Validation failed")
k8s.MaybeEmitErrorEvent(r.recorder, err, &ems, events.EventReasonValidation, err.Error())
return tracing.CaptureError(vctx, err)
}
return nil
}
func NewService(ems emsv1alpha1.ElasticMapsServer) *corev1.Service {
svc := corev1.Service{
ObjectMeta: ems.Spec.HTTP.Service.ObjectMeta,
Spec: ems.Spec.HTTP.Service.Spec,
}
svc.ObjectMeta.Namespace = ems.Namespace
svc.ObjectMeta.Name = HTTPService(ems.Name)
labels := ems.GetIdentityLabels()
ports := []corev1.ServicePort{
{
Name: ems.Spec.HTTP.Protocol(),
Protocol: corev1.ProtocolTCP,
Port: HTTPPort,
},
}
return defaults.SetServiceDefaults(&svc, labels, labels, ports)
}
func buildConfigHash(c k8s.Client, ems emsv1alpha1.ElasticMapsServer, configSecret corev1.Secret) (string, error) {
// build a hash of various settings to rotate the Pod on any change
configHash := fnv.New32a()
// - in the Elastic Maps Server configuration file content
_, _ = configHash.Write(configSecret.Data[ConfigFilename])
// - in the Elastic Maps Server TLS certificates
if ems.Spec.HTTP.TLS.Enabled() {
var tlsCertSecret corev1.Secret
tlsSecretKey := types.NamespacedName{Namespace: ems.Namespace, Name: certificates.InternalCertsSecretName(EMSNamer, ems.Name)}
if err := c.Get(context.Background(), tlsSecretKey, &tlsCertSecret); err != nil {
return "", err
}
if certPem, ok := tlsCertSecret.Data[certificates.CertFileName]; ok {
_, _ = configHash.Write(certPem)
}
}
// - in the associated Elasticsearch TLS certificates
if err := commonassociation.WriteAssocsToConfigHash(c, ems.GetAssociations(), configHash); err != nil {
return "", err
}
return fmt.Sprint(configHash.Sum32()), nil
}
func (r *ReconcileMapsServer) reconcileDeployment(
ctx context.Context,
ems emsv1alpha1.ElasticMapsServer,
configHash string,
) (appsv1.Deployment, error) {
span, _ := apm.StartSpan(ctx, "reconcile_deployment", tracing.SpanTypeApp)
defer span.End()
deployParams, err := r.deploymentParams(ems, configHash)
if err != nil {
return appsv1.Deployment{}, err
}
deploy := deployment.New(deployParams)
return deployment.Reconcile(ctx, r.K8sClient(), deploy, &ems)
}
func (r *ReconcileMapsServer) deploymentParams(ems emsv1alpha1.ElasticMapsServer, configHash string) (deployment.Params, error) {
podSpec, err := newPodSpec(ems, configHash)
if err != nil {
return deployment.Params{}, err
}
deploymentLabels := ems.GetIdentityLabels()
podLabels := maps.Merge(ems.GetIdentityLabels(), versionLabels(ems))
// merge with user-provided labels
podSpec.Labels = maps.MergePreservingExistingKeys(podSpec.Labels, podLabels)
return deployment.Params{
Name: Deployment(ems.Name),
Namespace: ems.Namespace,
Replicas: ems.Spec.Count,
Selector: deploymentLabels,
Labels: deploymentLabels,
PodTemplateSpec: podSpec,
Strategy: appsv1.DeploymentStrategy{Type: appsv1.RollingUpdateDeploymentStrategyType},
}, nil
}
func (r *ReconcileMapsServer) getStatus(ctx context.Context, ems emsv1alpha1.ElasticMapsServer, deploy appsv1.Deployment) (emsv1alpha1.MapsStatus, error) {
status := newStatus(ems)
pods, err := k8s.PodsMatchingLabels(r.K8sClient(), ems.Namespace, map[string]string{NameLabelName: ems.Name})
if err != nil {
return status, err
}
deploymentStatus, err := common.DeploymentStatus(ctx, ems.Status.DeploymentStatus, deploy, pods, versionLabelName)
if err != nil {
return status, err
}
status.DeploymentStatus = deploymentStatus
status.AssociationStatus = ems.Status.AssociationStatus
return status, nil
}
func (r *ReconcileMapsServer) updateStatus(ctx context.Context, ems emsv1alpha1.ElasticMapsServer, status emsv1alpha1.MapsStatus) error {
if reflect.DeepEqual(status, ems.Status) {
return nil // nothing to do
}
if status.IsDegraded(ems.Status.DeploymentStatus) {
r.recorder.Event(&ems, corev1.EventTypeWarning, events.EventReasonUnhealthy, "Elastic Maps Server health degraded")
}
ulog.FromContext(ctx).V(1).Info("Updating status",
"iteration", atomic.LoadUint64(&r.iteration),
"namespace", ems.Namespace,
"maps_name", ems.Name,
"status", status,
)
ems.Status = status
return common.UpdateStatus(ctx, r.Client, &ems)
}
func (r *ReconcileMapsServer) onDelete(ctx context.Context, obj types.NamespacedName) error {
// Clean up watches set on custom http tls certificates
r.dynamicWatches.Secrets.RemoveHandlerForKey(certificates.CertificateWatchKey(EMSNamer, obj.Name))
// same for the configRef secret
r.dynamicWatches.Secrets.RemoveHandlerForKey(common.ConfigRefWatchName(obj))
return reconciler.GarbageCollectSoftOwnedSecrets(ctx, r.Client, obj, emsv1alpha1.Kind)
}