in pkg/controller/autoscaling/elasticsearch/controller.go [104:250]
func (r *ReconcileElasticsearchAutoscaler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
ctx = common.NewReconciliationContext(ctx, &r.iteration, r.Tracer, ControllerName, "esa_name", request)
defer common.LogReconciliationRun(logconf.FromContext(ctx))()
defer tracing.EndContextTransaction(ctx)
log := logconf.FromContext(ctx)
// Fetch the ElasticsearchAutoscaler instance
var esa autoscalingv1alpha1.ElasticsearchAutoscaler
if err := r.Get(ctx, request.NamespacedName, &esa); err != nil {
if apierrors.IsNotFound(err) {
log.V(1).Info("ElasticsearchAutoscaler not found", "namespace", request.Namespace, "esa_name", request.Name)
r.Watches.ReferencedResources.RemoveHandlerForKey(dynamicWatchName(request))
return reconcile.Result{}, nil
}
return reconcile.Result{}, tracing.CaptureError(ctx, err)
}
// Ensure we watch the associated Elasticsearch
esNamespacedName := types.NamespacedName{Name: esa.Spec.ElasticsearchRef.Name, Namespace: request.Namespace}
if err := r.Watches.ReferencedResources.AddHandler(watches.NamedWatch[client.Object]{
Name: dynamicWatchName(request),
Watched: []types.NamespacedName{esNamespacedName},
Watcher: request.NamespacedName,
}); err != nil {
return reconcile.Result{}, tracing.CaptureError(ctx, err)
}
if common.IsUnmanaged(ctx, &esa) {
msg := "Object is currently not managed by this controller. Skipping reconciliation"
log.Info(msg, "namespace", request.Namespace, "esa_name", request.Name)
return r.reportAsInactive(ctx, log, esa, msg)
}
enabled, err := r.licenseChecker.EnterpriseFeaturesEnabled(ctx)
if err != nil {
return reconcile.Result{}, err
}
if !enabled {
log.Info(enterpriseFeaturesDisabledMsg)
r.recorder.Eventf(&esa, corev1.EventTypeWarning, license.EventInvalidLicense, enterpriseFeaturesDisabledMsg)
_, err := r.reportAsInactive(ctx, log, esa, enterpriseFeaturesDisabledMsg)
// We still schedule a reconciliation in case a valid license is applied later
return licenseCheckRequeue, err
}
// Fetch the Elasticsearch resource
var es esv1.Elasticsearch
if err := r.Get(ctx, esNamespacedName, &es); err != nil {
if apierrors.IsNotFound(err) {
msg := fmt.Sprintf("Elasticsearch resource %s/%s not found", esNamespacedName.Namespace, esNamespacedName.Name)
log.Info(msg, "namespace", request.Namespace, "esa_name", request.Name, "es_name", esNamespacedName.Name, "error", err.Error())
return r.reportAsInactive(ctx, log, esa, msg)
}
return reconcile.Result{}, tracing.CaptureError(ctx, err)
}
// Validate the autoscaling specification
if validationErr, runtimeErr := validation.ValidateElasticsearchAutoscaler(ctx, r.Client, esa, r.licenseChecker); validationErr != nil || runtimeErr != nil {
if validationErr != nil {
log.Error(
validationErr,
"ElasticsearchAutoscaler manifest validation failed",
"namespace", es.Namespace,
"esa_name", es.Name,
)
}
if runtimeErr != nil {
log.Error(
runtimeErr,
"Runtime error while validating ElasticsearchAutoscaler manifest",
"namespace", es.Namespace,
"esa_name", es.Name,
)
}
err := errors.NewAggregate([]error{validationErr, runtimeErr})
_, _ = r.reportAsUnhealthy(ctx, log, esa, err.Error())
return reconcile.Result{}, tracing.CaptureError(ctx, err)
}
// Get autoscaling policies and the associated node sets.
v, err := version.Parse(es.Spec.Version)
if err != nil {
return reconcile.Result{}, err
}
autoscaledNodeSets, nodeSetErr := es.GetAutoscaledNodeSets(v, esa.Spec.AutoscalingPolicySpecs)
if nodeSetErr != nil {
return reconcile.Result{}, tracing.CaptureError(ctx, nodeSetErr)
}
log.V(1).Info(
"Autoscaling policies and node sets",
"policies", autoscaledNodeSets.Names(),
"namespace", request.Namespace,
"esa_name", request.Name,
)
// Import existing resources in the current Status if the cluster is managed by some autoscaling policies but
// the status annotation does not exist.
if err := status.ImportExistingResources(
log,
r.Client,
esa.Spec.AutoscalingPolicySpecs,
es,
autoscaledNodeSets,
&esa.Status,
); err != nil {
_, _ = r.reportAsUnhealthy(ctx, log, esa, fmt.Sprintf("error while importing resources from the status subresource: %s", err.Error()))
// Status is updated on a best effort basis, we don't really care if the error above is not reported.
return reconcile.Result{}, tracing.CaptureError(ctx, err)
}
statusBuilder := newStatusBuilder(log, esa.Spec.AutoscalingPolicySpecs)
results := &reconciler.Results{}
// Call the main function
reconciledEs, reconcileInternalErr := r.reconcileInternal(ctx, es, statusBuilder, autoscaledNodeSets, &esa)
if reconcileInternalErr != nil {
// we do not return immediately as not all errors prevent to compute a reconciled Elasticsearch resource.
results.WithError(reconcileInternalErr)
}
// Update the new status
newStatus := statusBuilder.Build()
esa.Status.ObservedGeneration = ptr.To[int64](esa.Generation)
esa.Status.Conditions = esa.Status.Conditions.MergeWith(newStatus.Conditions...)
esa.Status.AutoscalingPolicyStatuses = newStatus.AutoscalingPolicyStatuses
updateStatus, err := r.updateStatus(ctx, log, esa)
if err != nil {
return reconcile.Result{}, err
}
results.WithResult(updateStatus)
if reconciledEs == nil {
// No Elasticsearch resource, with up-to-date compute and storage resources, has been returned.
// It's likely to be the case if a fatal error prevented resource calculation from the Elasticsearch
// autoscaling API or if an "offline" reconciliation failed.
return results.Aggregate()
}
// Update the Elasticsearch resource
if err := r.Client.Update(ctx, reconciledEs); err != nil {
if apierrors.IsConflict(err) {
return results.WithResult(reconcile.Result{Requeue: true}).Aggregate()
}
return results.WithError(err).Aggregate()
}
return results.WithResults(defaultResult(&esa)).Aggregate()
}