pkg/controller/elasticsearch/elasticsearch_controller.go (283 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package elasticsearch
import (
"context"
"reflect"
"sync/atomic"
pkgerrors "github.com/pkg/errors"
"go.elastic.co/apm/v2"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/certificates"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/events"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/expectations"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/finalizer"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/keystore"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/license"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/operator"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/reconciler"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/tracing"
commonversion "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/version"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/watches"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/certificates/transport"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/driver"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/label"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/observer"
esreconcile "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/reconcile"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/user"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/validation"
esversion "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/version"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s"
ulog "github.com/elastic/cloud-on-k8s/v3/pkg/utils/log"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/maps"
)
const name = "elasticsearch-controller"
// Add creates a new Elasticsearch Controller and adds it to the Manager with default RBAC. The Manager will set fields
// on the Controller and Start it when the Manager is Started.
// this is also called by cmd/main.go
func Add(mgr manager.Manager, params operator.Parameters) error {
reconciler := newReconciler(mgr, params)
c, err := common.NewController(mgr, name, reconciler, params)
if err != nil {
return err
}
return addWatches(mgr, c, reconciler)
}
// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager, params operator.Parameters) *ReconcileElasticsearch {
client := mgr.GetClient()
return &ReconcileElasticsearch{
Client: client,
recorder: mgr.GetEventRecorderFor(name),
licenseChecker: license.NewLicenseChecker(client, params.OperatorNamespace),
esObservers: observer.NewManager(params.ElasticsearchObservationInterval, params.Tracer),
dynamicWatches: watches.NewDynamicWatches(),
expectations: expectations.NewClustersExpectations(client),
Parameters: params,
}
}
func addWatches(mgr manager.Manager, c controller.Controller, r *ReconcileElasticsearch) error {
// Watch for changes to Elasticsearch
if err := c.Watch(
source.Kind(mgr.GetCache(), &esv1.Elasticsearch{}, &handler.TypedEnqueueRequestForObject[*esv1.Elasticsearch]{})); err != nil {
return err
}
// Watch StatefulSets
if err := c.Watch(
source.Kind(mgr.GetCache(), &appsv1.StatefulSet{}, handler.TypedEnqueueRequestForOwner[*appsv1.StatefulSet](mgr.GetScheme(), mgr.GetRESTMapper(), &esv1.Elasticsearch{}, handler.OnlyControllerOwner()))); err != nil {
return err
}
// Watch pods belonging to ES clusters
if err := watches.WatchPods(mgr, c, label.ClusterNameLabelName); err != nil {
return err
}
// Watch services
if err := c.Watch(
source.Kind(mgr.GetCache(), &corev1.Service{}, handler.TypedEnqueueRequestForOwner[*corev1.Service](mgr.GetScheme(), mgr.GetRESTMapper(), &esv1.Elasticsearch{}, handler.OnlyControllerOwner()))); err != nil {
return err
}
// Watch config maps for dynamic watches (currently used for additional CAs trust)
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.ConfigMap{}, r.dynamicWatches.ConfigMaps)); err != nil {
return err
}
// Watch owned and soft-owned secrets
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}, r.dynamicWatches.Secrets)); err != nil {
return err
}
if err := r.dynamicWatches.Secrets.AddHandler(&watches.OwnerWatch[*corev1.Secret]{
IsController: true,
OwnerType: &esv1.Elasticsearch{},
Scheme: mgr.GetScheme(),
Mapper: mgr.GetRESTMapper(),
},
); err != nil {
return err
}
if err := watches.WatchSoftOwnedSecrets(mgr, c, esv1.Kind); err != nil {
return err
}
// Trigger a reconciliation when observers report a cluster health change
return c.Watch(observer.WatchClusterHealthChange(r.esObservers))
}
var _ reconcile.Reconciler = &ReconcileElasticsearch{}
// ReconcileElasticsearch reconciles an Elasticsearch object
type ReconcileElasticsearch struct {
k8s.Client
operator.Parameters
recorder record.EventRecorder
licenseChecker license.Checker
esObservers *observer.Manager
dynamicWatches watches.DynamicWatches
// expectations help dealing with inconsistencies in our client cache,
// by marking resources updates as expected, and skipping some operations if the cache is not up-to-date.
expectations *expectations.ClustersExpectation
// iteration is the number of times this controller has run its Reconcile method
iteration uint64
}
// Reconcile reads the state of the cluster for an Elasticsearch object and makes changes based on the state read and
// what is in the Elasticsearch.Spec
func (r *ReconcileElasticsearch) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
ctx = common.NewReconciliationContext(ctx, &r.iteration, r.Tracer, name, "es_name", request)
defer common.LogReconciliationRun(ulog.FromContext(ctx))()
defer tracing.EndContextTransaction(ctx)
log := ulog.FromContext(ctx)
// Fetch the Elasticsearch instance
var es esv1.Elasticsearch
requeue, err := r.fetchElasticsearchWithAssociations(ctx, request, &es)
if err != nil || requeue {
return reconcile.Result{}, tracing.CaptureError(ctx, err)
}
if common.IsUnmanaged(ctx, &es) {
log.Info("Object is currently not managed by this controller. Skipping reconciliation", "namespace", es.Namespace, "es_name", es.Name)
return reconcile.Result{}, nil
}
// Remove any previous Finalizers
if err := finalizer.RemoveAll(ctx, r.Client, &es); err != nil {
return reconcile.Result{}, tracing.CaptureError(ctx, err)
}
state, err := esreconcile.NewState(es)
if err != nil {
return reconcile.Result{}, tracing.CaptureError(ctx, err)
}
// ReconciliationComplete is initially set to True until another condition with the same type is reported.
state.ReportCondition(esv1.ReconciliationComplete, corev1.ConditionTrue, "")
results := r.internalReconcile(ctx, es, state)
// Update orchestration related annotations
if err := r.annotateResource(ctx, es, state); err != nil {
if apierrors.IsConflict(err) {
log.V(1).Info("Conflict while updating annotations", "namespace", es.Namespace, "es_name", es.Name)
results.WithReconciliationState(reconciler.Requeue.WithReason("Conflict while updating annotations"))
} else {
log.Error(err, "Error while updating annotations", "namespace", es.Namespace, "es_name", es.Name)
results.WithError(err)
k8s.MaybeEmitErrorEvent(r.recorder, err, &es, events.EventReconciliationError, "Reconciliation error: %v", err)
}
}
if isReconciled, message := results.IsReconciled(); !isReconciled {
state.UpdateWithPhase(esv1.ElasticsearchApplyingChangesPhase)
state.ReportCondition(esv1.ReconciliationComplete, corev1.ConditionFalse, message)
} else {
state.UpdateWithPhase(esv1.ElasticsearchReadyPhase)
}
// Last step of the reconciliation loop is always to update the Elasticsearch resource status.
err = r.updateStatus(ctx, es, state)
if err != nil {
if apierrors.IsConflict(err) {
log.V(1).Info("Conflict while updating status", "namespace", es.Namespace, "es_name", es.Name)
return reconcile.Result{Requeue: true}, nil
}
k8s.MaybeEmitErrorEvent(r.recorder, err, &es, events.EventReconciliationError, "Reconciliation error: %v", err)
}
return results.WithError(err).Aggregate()
}
func (r *ReconcileElasticsearch) fetchElasticsearchWithAssociations(ctx context.Context, request reconcile.Request, es *esv1.Elasticsearch) (bool, error) {
span, ctx := apm.StartSpan(ctx, "fetch_elasticsearch", tracing.SpanTypeApp)
defer span.End()
if err := r.Client.Get(ctx, request.NamespacedName, es); err != nil {
if apierrors.IsNotFound(err) {
// Object not found, cleanup in-memory state. Children resources are garbage-collected either by
// the operator (see `onDelete`), either by k8s through the ownerReference mechanism.
return true, r.onDelete(ctx,
types.NamespacedName{
Namespace: request.Namespace,
Name: request.Name,
})
}
// Error reading the object - requeue the request.
return true, err
}
return false, nil
}
func (r *ReconcileElasticsearch) internalReconcile(
ctx context.Context,
es esv1.Elasticsearch,
reconcileState *esreconcile.State,
) *reconciler.Results {
results := reconciler.NewResult(ctx)
log := log.FromContext(ctx)
if es.IsMarkedForDeletion() {
// resource will be deleted, nothing to reconcile
return results.WithError(r.onDelete(ctx, k8s.ExtractNamespacedName(&es)))
}
span, ctx := apm.StartSpan(ctx, "validate", tracing.SpanTypeApp)
// this is the same validation as the webhook, but we run it again here in case the webhook has not been configured
_, err := validation.ValidateElasticsearch(ctx, es, r.licenseChecker, r.ExposedNodeLabels)
span.End()
if err != nil {
log.Error(
err,
"Elasticsearch manifest validation failed",
"namespace", es.Namespace,
"es_name", es.Name,
)
reconcileState.UpdateElasticsearchInvalidWithEvent(err.Error())
return results
}
err = validation.CheckForWarnings(es)
if err != nil {
log.Info(
"Elasticsearch manifest has warnings. Proceed at your own risk. "+err.Error(),
"namespace", es.Namespace,
"es_name", es.Name,
)
reconcileState.AddEvent(corev1.EventTypeWarning, events.EventReasonValidation, err.Error())
}
ver, err := commonversion.Parse(es.Spec.Version)
if err != nil {
return results.WithError(err)
}
supported := esversion.SupportedVersions(ver)
if supported == nil {
return results.WithError(pkgerrors.Errorf("unsupported version: %s", ver))
}
return driver.NewDefaultDriver(driver.DefaultDriverParameters{
OperatorParameters: r.Parameters,
ES: es,
ReconcileState: reconcileState,
Client: r.Client,
Recorder: r.recorder,
Version: ver,
Expectations: r.expectations.ForCluster(k8s.ExtractNamespacedName(&es)),
Observers: r.esObservers,
DynamicWatches: r.dynamicWatches,
SupportedVersions: *supported,
LicenseChecker: r.licenseChecker,
}).Reconcile(ctx)
}
func (r *ReconcileElasticsearch) updateStatus(
ctx context.Context,
es esv1.Elasticsearch,
reconcileState *esreconcile.State,
) error {
defer tracing.Span(&ctx)()
log := ulog.FromContext(ctx)
events, cluster := reconcileState.Apply()
for _, evt := range events {
log.V(1).Info("Recording event", "event", evt)
r.recorder.Event(&es, evt.EventType, evt.Reason, evt.Message)
}
if cluster == nil {
return nil
}
log.V(1).Info("Updating status",
"iteration", atomic.LoadUint64(&r.iteration),
"namespace", es.Namespace,
"es_name", es.Name,
"status", cluster.Status,
)
return common.UpdateStatus(ctx, r.Client, cluster)
}
// annotateResource adds the orchestration hints annotation to the Elasticsearch resource. The purpose of this annotation
// is to capture additional state about aspects of the operator's orchestration of Elasticsearch resources. Currently,
// it captures whether transient settings are in use. Future expansion is possible if deemed necessary.
func (r *ReconcileElasticsearch) annotateResource(
ctx context.Context,
es esv1.Elasticsearch,
reconcileState *esreconcile.State,
) error {
span, _ := apm.StartSpan(ctx, "update_hints_annotations", tracing.SpanTypeApp)
defer span.End()
log := ulog.FromContext(ctx)
// cluster-uuid is a special case of an annotation on the Elasticsearch resource that is not treated here but through
// an immediate update due to the risk of data loss. See bootstrap package.
newAnnotations, err := reconcileState.OrchestrationHints().AsAnnotation()
if err != nil {
return err
}
expected := maps.Merge(es.ObjectMeta.DeepCopy().Annotations, newAnnotations)
if reflect.DeepEqual(expected, es.Annotations) {
log.V(1).Info("Skipping annotation update", "es_name", es.Name, "namespace", es.Namespace)
return nil
}
es.SetAnnotations(expected)
return r.Update(ctx, &es)
}
// onDelete garbage collect resources when an Elasticsearch cluster is deleted
func (r *ReconcileElasticsearch) onDelete(ctx context.Context, es types.NamespacedName) error {
r.expectations.RemoveCluster(es)
r.esObservers.StopObserving(es)
r.dynamicWatches.Secrets.RemoveHandlerForKey(keystore.SecureSettingsWatchName(es))
r.dynamicWatches.Secrets.RemoveHandlerForKey(certificates.CertificateWatchKey(esv1.ESNamer, es.Name))
r.dynamicWatches.Secrets.RemoveHandlerForKey(transport.CustomTransportCertsWatchKey(es))
r.dynamicWatches.Secrets.RemoveHandlerForKey(user.UserProvidedRolesWatchName(es))
r.dynamicWatches.Secrets.RemoveHandlerForKey(user.UserProvidedFileRealmWatchName(es))
r.dynamicWatches.ConfigMaps.RemoveHandlerForKey(transport.AdditionalCAWatchKey(es))
return reconciler.GarbageCollectSoftOwnedSecrets(ctx, r.Client, es, esv1.Kind)
}