pkg/controller/autoscaling/elasticsearch/controller.go (306 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package elasticsearch
import (
"context"
"fmt"
"time"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
autoscalingv1alpha1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/autoscaling/v1alpha1"
"github.com/elastic/cloud-on-k8s/v3/pkg/apis/common/v1alpha1"
esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/autoscaling/elasticsearch/status"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/autoscaling/elasticsearch/validation"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common"
commonesclient "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/esclient"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/license"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/operator"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/reconciler"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/tracing"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/version"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/watches"
esclient "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/client"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s"
logconf "github.com/elastic/cloud-on-k8s/v3/pkg/utils/log"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/net"
)
type EsClientProvider func(ctx context.Context, c k8s.Client, dialer net.Dialer, es esv1.Elasticsearch) (esclient.Client, error)
const (
// ControllerName is the name of the autoscaling controller based on the dedicated Elasticsearch autoscaling resource. It supersedes the legacy
// controller which is reading the autoscaling specification in an annotation on the Elasticsearch resource.
ControllerName = "elasticsearch-autoscaler"
enterpriseFeaturesDisabledMsg = "Autoscaling is an enterprise feature. Enterprise features are disabled"
)
// licenseCheckRequeue is the default duration used to retry a licence check if the cluster is supposed to be managed by
// the autoscaling controller and if the licence is not valid.
var licenseCheckRequeue = reconcile.Result{
Requeue: true,
RequeueAfter: 60 * time.Second,
}
// baseReconcileAutoscaling is the base struct for both the legacy and the CRD based reconcilers.
type baseReconcileAutoscaling struct {
k8s.Client
operator.Parameters
esClientProvider EsClientProvider
recorder record.EventRecorder
licenseChecker license.Checker
// iteration is the number of times this controller has run its Reconcile method
iteration uint64 //nolint:structcheck
}
func (r baseReconcileAutoscaling) withRecorder(recorder record.EventRecorder) baseReconcileAutoscaling {
r.recorder = recorder
return r
}
// ReconcileElasticsearchAutoscaler reconciles autoscaling policies and Elasticsearch resources specifications based on
// Elasticsearch autoscaling API response.
type ReconcileElasticsearchAutoscaler struct {
baseReconcileAutoscaling
Watches watches.DynamicWatches
}
// NewReconciler returns a new autoscaling reconcile.Reconciler
func NewReconciler(mgr manager.Manager, params operator.Parameters) *ReconcileElasticsearchAutoscaler {
c := mgr.GetClient()
reconcileAutoscaling := baseReconcileAutoscaling{
Client: c,
Parameters: params,
esClientProvider: commonesclient.NewClient,
recorder: mgr.GetEventRecorderFor(ControllerName),
licenseChecker: license.NewLicenseChecker(c, params.OperatorNamespace),
}
return &ReconcileElasticsearchAutoscaler{
baseReconcileAutoscaling: reconcileAutoscaling.withRecorder(mgr.GetEventRecorderFor(ControllerName)),
Watches: watches.NewDynamicWatches(),
}
}
func dynamicWatchName(request reconcile.Request) string {
return fmt.Sprintf("%s-%s-referenced-es-watch", request.Namespace, request.Name)
}
func (r *ReconcileElasticsearchAutoscaler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
ctx = common.NewReconciliationContext(ctx, &r.iteration, r.Tracer, ControllerName, "esa_name", request)
defer common.LogReconciliationRun(logconf.FromContext(ctx))()
defer tracing.EndContextTransaction(ctx)
log := logconf.FromContext(ctx)
// Fetch the ElasticsearchAutoscaler instance
var esa autoscalingv1alpha1.ElasticsearchAutoscaler
if err := r.Get(ctx, request.NamespacedName, &esa); err != nil {
if apierrors.IsNotFound(err) {
log.V(1).Info("ElasticsearchAutoscaler not found", "namespace", request.Namespace, "esa_name", request.Name)
r.Watches.ReferencedResources.RemoveHandlerForKey(dynamicWatchName(request))
return reconcile.Result{}, nil
}
return reconcile.Result{}, tracing.CaptureError(ctx, err)
}
// Ensure we watch the associated Elasticsearch
esNamespacedName := types.NamespacedName{Name: esa.Spec.ElasticsearchRef.Name, Namespace: request.Namespace}
if err := r.Watches.ReferencedResources.AddHandler(watches.NamedWatch[client.Object]{
Name: dynamicWatchName(request),
Watched: []types.NamespacedName{esNamespacedName},
Watcher: request.NamespacedName,
}); err != nil {
return reconcile.Result{}, tracing.CaptureError(ctx, err)
}
if common.IsUnmanaged(ctx, &esa) {
msg := "Object is currently not managed by this controller. Skipping reconciliation"
log.Info(msg, "namespace", request.Namespace, "esa_name", request.Name)
return r.reportAsInactive(ctx, log, esa, msg)
}
enabled, err := r.licenseChecker.EnterpriseFeaturesEnabled(ctx)
if err != nil {
return reconcile.Result{}, err
}
if !enabled {
log.Info(enterpriseFeaturesDisabledMsg)
r.recorder.Eventf(&esa, corev1.EventTypeWarning, license.EventInvalidLicense, enterpriseFeaturesDisabledMsg)
_, err := r.reportAsInactive(ctx, log, esa, enterpriseFeaturesDisabledMsg)
// We still schedule a reconciliation in case a valid license is applied later
return licenseCheckRequeue, err
}
// Fetch the Elasticsearch resource
var es esv1.Elasticsearch
if err := r.Get(ctx, esNamespacedName, &es); err != nil {
if apierrors.IsNotFound(err) {
msg := fmt.Sprintf("Elasticsearch resource %s/%s not found", esNamespacedName.Namespace, esNamespacedName.Name)
log.Info(msg, "namespace", request.Namespace, "esa_name", request.Name, "es_name", esNamespacedName.Name, "error", err.Error())
return r.reportAsInactive(ctx, log, esa, msg)
}
return reconcile.Result{}, tracing.CaptureError(ctx, err)
}
// Validate the autoscaling specification
if validationErr, runtimeErr := validation.ValidateElasticsearchAutoscaler(ctx, r.Client, esa, r.licenseChecker); validationErr != nil || runtimeErr != nil {
if validationErr != nil {
log.Error(
validationErr,
"ElasticsearchAutoscaler manifest validation failed",
"namespace", es.Namespace,
"esa_name", es.Name,
)
}
if runtimeErr != nil {
log.Error(
runtimeErr,
"Runtime error while validating ElasticsearchAutoscaler manifest",
"namespace", es.Namespace,
"esa_name", es.Name,
)
}
err := errors.NewAggregate([]error{validationErr, runtimeErr})
_, _ = r.reportAsUnhealthy(ctx, log, esa, err.Error())
return reconcile.Result{}, tracing.CaptureError(ctx, err)
}
// Get autoscaling policies and the associated node sets.
v, err := version.Parse(es.Spec.Version)
if err != nil {
return reconcile.Result{}, err
}
autoscaledNodeSets, nodeSetErr := es.GetAutoscaledNodeSets(v, esa.Spec.AutoscalingPolicySpecs)
if nodeSetErr != nil {
return reconcile.Result{}, tracing.CaptureError(ctx, nodeSetErr)
}
log.V(1).Info(
"Autoscaling policies and node sets",
"policies", autoscaledNodeSets.Names(),
"namespace", request.Namespace,
"esa_name", request.Name,
)
// Import existing resources in the current Status if the cluster is managed by some autoscaling policies but
// the status annotation does not exist.
if err := status.ImportExistingResources(
log,
r.Client,
esa.Spec.AutoscalingPolicySpecs,
es,
autoscaledNodeSets,
&esa.Status,
); err != nil {
_, _ = r.reportAsUnhealthy(ctx, log, esa, fmt.Sprintf("error while importing resources from the status subresource: %s", err.Error()))
// Status is updated on a best effort basis, we don't really care if the error above is not reported.
return reconcile.Result{}, tracing.CaptureError(ctx, err)
}
statusBuilder := newStatusBuilder(log, esa.Spec.AutoscalingPolicySpecs)
results := &reconciler.Results{}
// Call the main function
reconciledEs, reconcileInternalErr := r.reconcileInternal(ctx, es, statusBuilder, autoscaledNodeSets, &esa)
if reconcileInternalErr != nil {
// we do not return immediately as not all errors prevent to compute a reconciled Elasticsearch resource.
results.WithError(reconcileInternalErr)
}
// Update the new status
newStatus := statusBuilder.Build()
esa.Status.ObservedGeneration = ptr.To[int64](esa.Generation)
esa.Status.Conditions = esa.Status.Conditions.MergeWith(newStatus.Conditions...)
esa.Status.AutoscalingPolicyStatuses = newStatus.AutoscalingPolicyStatuses
updateStatus, err := r.updateStatus(ctx, log, esa)
if err != nil {
return reconcile.Result{}, err
}
results.WithResult(updateStatus)
if reconciledEs == nil {
// No Elasticsearch resource, with up-to-date compute and storage resources, has been returned.
// It's likely to be the case if a fatal error prevented resource calculation from the Elasticsearch
// autoscaling API or if an "offline" reconciliation failed.
return results.Aggregate()
}
// Update the Elasticsearch resource
if err := r.Client.Update(ctx, reconciledEs); err != nil {
if apierrors.IsConflict(err) {
return results.WithResult(reconcile.Result{Requeue: true}).Aggregate()
}
return results.WithError(err).Aggregate()
}
return results.WithResults(defaultResult(&esa)).Aggregate()
}
// reportAsUnhealthy reports the autoscaler as inactive in the status.
func (r *ReconcileElasticsearchAutoscaler) reportAsUnhealthy(
ctx context.Context,
log logr.Logger,
esa autoscalingv1alpha1.ElasticsearchAutoscaler,
message string,
) (reconcile.Result, error) {
now := metav1.Now()
newStatus := esa.Status.DeepCopy()
newStatus.ObservedGeneration = ptr.To[int64](esa.Generation)
newStatus.Conditions = newStatus.Conditions.MergeWith(
v1alpha1.Condition{
Type: v1alpha1.ElasticsearchAutoscalerActive,
Status: corev1.ConditionTrue,
LastTransitionTime: now,
Message: "Autoscaler is unhealthy",
},
v1alpha1.Condition{
Type: v1alpha1.ElasticsearchAutoscalerHealthy,
Status: corev1.ConditionFalse,
LastTransitionTime: now,
Message: message,
},
v1alpha1.Condition{
Type: v1alpha1.ElasticsearchAutoscalerOnline,
Status: corev1.ConditionFalse,
LastTransitionTime: now,
Message: "Autoscaler is unhealthy",
},
)
// Insert a new limited status if there is none.
if newStatus.Conditions.Index(v1alpha1.ElasticsearchAutoscalerLimited) < 0 {
newStatus.Conditions = newStatus.Conditions.MergeWith(v1alpha1.Condition{
Type: v1alpha1.ElasticsearchAutoscalerLimited,
Status: corev1.ConditionUnknown,
LastTransitionTime: now,
})
}
esa.Status = *newStatus
return r.updateStatus(ctx, log, esa)
}
// reportAsInactive reports the autoscaler as inactive in the status.
func (r *ReconcileElasticsearchAutoscaler) reportAsInactive(
ctx context.Context,
log logr.Logger,
esa autoscalingv1alpha1.ElasticsearchAutoscaler,
message string,
) (reconcile.Result, error) {
now := metav1.Now()
newStatus := esa.Status.DeepCopy()
newStatus.ObservedGeneration = ptr.To[int64](esa.Generation)
newStatus.Conditions = newStatus.Conditions.MergeWith(
v1alpha1.Condition{
Type: v1alpha1.ElasticsearchAutoscalerActive,
Status: corev1.ConditionFalse,
LastTransitionTime: now,
Message: message,
},
v1alpha1.Condition{
Type: v1alpha1.ElasticsearchAutoscalerHealthy,
Status: corev1.ConditionUnknown,
LastTransitionTime: now,
Message: "Autoscaler is inactive",
},
v1alpha1.Condition{
Type: v1alpha1.ElasticsearchAutoscalerOnline,
Status: corev1.ConditionFalse,
LastTransitionTime: now,
Message: "Autoscaler is inactive",
},
v1alpha1.Condition{
Type: v1alpha1.ElasticsearchAutoscalerLimited,
Status: corev1.ConditionUnknown,
LastTransitionTime: now,
},
)
esa.Status = *newStatus
return r.updateStatus(ctx, log, esa)
}
func (r *ReconcileElasticsearchAutoscaler) updateStatus(
ctx context.Context,
log logr.Logger,
esa autoscalingv1alpha1.ElasticsearchAutoscaler,
) (reconcile.Result, error) {
results := &reconciler.Results{}
if err := r.Client.Status().Update(ctx, &esa); err != nil {
if apierrors.IsConflict(err) {
log.V(1).Info(
"Conflict while updating the status",
"namespace", esa.Namespace,
"esa_name", esa.Name,
"error", err.Error(),
)
return results.WithResult(reconcile.Result{Requeue: true}).Aggregate()
}
return results.WithError(tracing.CaptureError(ctx, err)).Aggregate()
}
return results.Aggregate()
}
func defaultResult(autoscalingSpecification v1alpha1.AutoscalingResource) *reconciler.Results {
results := reconciler.Results{}
requeueAfter := v1alpha1.DefaultPollingPeriod
pollingPeriod, err := autoscalingSpecification.GetPollingPeriod()
if err != nil {
return results.WithError(err)
}
if pollingPeriod != nil {
requeueAfter = pollingPeriod.Duration
}
return results.WithResult(
reconcile.Result{
Requeue: true,
RequeueAfter: requeueAfter,
})
}