func()

in pkg/controller/autoscaling/elasticsearch/controller.go [104:250]


func (r *ReconcileElasticsearchAutoscaler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
	ctx = common.NewReconciliationContext(ctx, &r.iteration, r.Tracer, ControllerName, "esa_name", request)
	defer common.LogReconciliationRun(logconf.FromContext(ctx))()
	defer tracing.EndContextTransaction(ctx)

	log := logconf.FromContext(ctx)
	// Fetch the ElasticsearchAutoscaler instance
	var esa autoscalingv1alpha1.ElasticsearchAutoscaler
	if err := r.Get(ctx, request.NamespacedName, &esa); err != nil {
		if apierrors.IsNotFound(err) {
			log.V(1).Info("ElasticsearchAutoscaler not found", "namespace", request.Namespace, "esa_name", request.Name)
			r.Watches.ReferencedResources.RemoveHandlerForKey(dynamicWatchName(request))
			return reconcile.Result{}, nil
		}
		return reconcile.Result{}, tracing.CaptureError(ctx, err)
	}

	// Ensure we watch the associated Elasticsearch
	esNamespacedName := types.NamespacedName{Name: esa.Spec.ElasticsearchRef.Name, Namespace: request.Namespace}
	if err := r.Watches.ReferencedResources.AddHandler(watches.NamedWatch[client.Object]{
		Name:    dynamicWatchName(request),
		Watched: []types.NamespacedName{esNamespacedName},
		Watcher: request.NamespacedName,
	}); err != nil {
		return reconcile.Result{}, tracing.CaptureError(ctx, err)
	}

	if common.IsUnmanaged(ctx, &esa) {
		msg := "Object is currently not managed by this controller. Skipping reconciliation"
		log.Info(msg, "namespace", request.Namespace, "esa_name", request.Name)
		return r.reportAsInactive(ctx, log, esa, msg)
	}

	enabled, err := r.licenseChecker.EnterpriseFeaturesEnabled(ctx)
	if err != nil {
		return reconcile.Result{}, err
	}
	if !enabled {
		log.Info(enterpriseFeaturesDisabledMsg)
		r.recorder.Eventf(&esa, corev1.EventTypeWarning, license.EventInvalidLicense, enterpriseFeaturesDisabledMsg)
		_, err := r.reportAsInactive(ctx, log, esa, enterpriseFeaturesDisabledMsg)
		// We still schedule a reconciliation in case a valid license is applied later
		return licenseCheckRequeue, err
	}

	// Fetch the Elasticsearch resource
	var es esv1.Elasticsearch
	if err := r.Get(ctx, esNamespacedName, &es); err != nil {
		if apierrors.IsNotFound(err) {
			msg := fmt.Sprintf("Elasticsearch resource %s/%s not found", esNamespacedName.Namespace, esNamespacedName.Name)
			log.Info(msg, "namespace", request.Namespace, "esa_name", request.Name, "es_name", esNamespacedName.Name, "error", err.Error())
			return r.reportAsInactive(ctx, log, esa, msg)
		}
		return reconcile.Result{}, tracing.CaptureError(ctx, err)
	}

	// Validate the autoscaling specification
	if validationErr, runtimeErr := validation.ValidateElasticsearchAutoscaler(ctx, r.Client, esa, r.licenseChecker); validationErr != nil || runtimeErr != nil {
		if validationErr != nil {
			log.Error(
				validationErr,
				"ElasticsearchAutoscaler manifest validation failed",
				"namespace", es.Namespace,
				"esa_name", es.Name,
			)
		}
		if runtimeErr != nil {
			log.Error(
				runtimeErr,
				"Runtime error while validating ElasticsearchAutoscaler manifest",
				"namespace", es.Namespace,
				"esa_name", es.Name,
			)
		}
		err := errors.NewAggregate([]error{validationErr, runtimeErr})
		_, _ = r.reportAsUnhealthy(ctx, log, esa, err.Error())
		return reconcile.Result{}, tracing.CaptureError(ctx, err)
	}

	// Get autoscaling policies and the associated node sets.
	v, err := version.Parse(es.Spec.Version)
	if err != nil {
		return reconcile.Result{}, err
	}
	autoscaledNodeSets, nodeSetErr := es.GetAutoscaledNodeSets(v, esa.Spec.AutoscalingPolicySpecs)
	if nodeSetErr != nil {
		return reconcile.Result{}, tracing.CaptureError(ctx, nodeSetErr)
	}
	log.V(1).Info(
		"Autoscaling policies and node sets",
		"policies", autoscaledNodeSets.Names(),
		"namespace", request.Namespace,
		"esa_name", request.Name,
	)

	// Import existing resources in the current Status if the cluster is managed by some autoscaling policies but
	// the status annotation does not exist.
	if err := status.ImportExistingResources(
		log,
		r.Client,
		esa.Spec.AutoscalingPolicySpecs,
		es,
		autoscaledNodeSets,
		&esa.Status,
	); err != nil {
		_, _ = r.reportAsUnhealthy(ctx, log, esa, fmt.Sprintf("error while importing resources from the status subresource: %s", err.Error()))
		// Status is updated on a best effort basis, we don't really care if the error above is not reported.
		return reconcile.Result{}, tracing.CaptureError(ctx, err)
	}

	statusBuilder := newStatusBuilder(log, esa.Spec.AutoscalingPolicySpecs)
	results := &reconciler.Results{}

	// Call the main function
	reconciledEs, reconcileInternalErr := r.reconcileInternal(ctx, es, statusBuilder, autoscaledNodeSets, &esa)
	if reconcileInternalErr != nil {
		// we do not return immediately as not all errors prevent to compute a reconciled Elasticsearch resource.
		results.WithError(reconcileInternalErr)
	}

	// Update the new status
	newStatus := statusBuilder.Build()
	esa.Status.ObservedGeneration = ptr.To[int64](esa.Generation)
	esa.Status.Conditions = esa.Status.Conditions.MergeWith(newStatus.Conditions...)
	esa.Status.AutoscalingPolicyStatuses = newStatus.AutoscalingPolicyStatuses
	updateStatus, err := r.updateStatus(ctx, log, esa)
	if err != nil {
		return reconcile.Result{}, err
	}
	results.WithResult(updateStatus)

	if reconciledEs == nil {
		// No Elasticsearch resource, with up-to-date compute and storage resources, has been returned.
		// It's likely to be the case if a fatal error prevented resource calculation from the Elasticsearch
		// autoscaling API or if an "offline" reconciliation failed.
		return results.Aggregate()
	}

	// Update the Elasticsearch resource
	if err := r.Client.Update(ctx, reconciledEs); err != nil {
		if apierrors.IsConflict(err) {
			return results.WithResult(reconcile.Result{Requeue: true}).Aggregate()
		}
		return results.WithError(err).Aggregate()
	}
	return results.WithResults(defaultResult(&esa)).Aggregate()
}