func()

in pkg/controller/autoscaling/elasticsearch/driver.go [98:213]


func (r *baseReconcileAutoscaling) attemptOnlineReconciliation(
	ctx context.Context,
	es esv1.Elasticsearch,
	statusBuilder *v1alpha1.AutoscalingStatusBuilder,
	autoscaledNodeSets esv1.AutoscaledNodeSets,
	autoscalingResource v1alpha1.AutoscalingResource,
) (*esv1.Elasticsearch, error) {
	span, ctx := apm.StartSpan(ctx, "online_reconciliation", tracing.SpanTypeApp)
	defer span.End()
	autoscalingSpec, err := autoscalingResource.GetAutoscalingPolicySpecs()
	if err != nil {
		return nil, err
	}
	log := logconf.FromContext(ctx)
	log.V(1).Info("Starting online autoscaling reconciliation")
	esClient, err := r.esClientProvider(ctx, r.Client, r.Dialer, es)
	if err != nil {
		return nil, err
	}
	defer esClient.Close()

	// Update Machine Learning settings
	mlNodes, maxMemory := esv1.GetMLNodesSettings(autoscalingSpec)
	if err := esClient.UpdateMLNodesSettings(ctx, mlNodes, maxMemory); err != nil {
		log.Error(err, "Error while updating the ML settings")
		return nil, err
	}

	// Update autoscaling policies in Elasticsearch
	if err := updatePolicies(ctx, log, autoscalingResource, esClient); err != nil {
		log.Error(err, "Error while updating the autoscaling policies")
		return nil, err
	}

	// Get capacity requirements from the Elasticsearch autoscaling capacity API
	autoscalingCapacityResult, err := esClient.GetAutoscalingCapacity(ctx)
	if err != nil {
		return nil, err
	}

	// nextClusterResources holds the resources computed by the autoscaling algorithm for each nodeSet.
	var nextClusterResources v1alpha1.ClusterResources

	currentAutoscalingStatus, err := autoscalingResource.GetElasticsearchAutoscalerStatus()
	if err != nil {
		return nil, err
	}
	var errors []error
	// For each autoscaling policy we compute the resources to be applied to the related nodeSets.
	for _, autoscalingPolicy := range autoscalingSpec {
		// Get the currentNodeSets
		nodeSetList, exists := autoscaledNodeSets[autoscalingPolicy.Name]
		if !exists {
			// This situation should be caught during the validation, we still want to trace this error if it happens.
			err := fmt.Errorf("no nodeSets for tier %s", autoscalingPolicy.Name)
			log.Error(err, "no nodeSet for a tier", "policy", autoscalingPolicy.Name)
			errors = append(errors, err)
			statusBuilder.ForPolicy(autoscalingPolicy.Name).RecordEvent(v1alpha1.NoNodeSet, err.Error())
			continue
		}

		// Get the required capacity for this autoscaling policy from the Elasticsearch API
		var nodeSetsResources v1alpha1.NodeSetsResources
		autoscalingPolicyResult, hasCapacity := autoscalingCapacityResult.Policies[autoscalingPolicy.Name]
		if hasCapacity && !autoscalingPolicyResult.RequiredCapacity.IsEmpty() {
			// We received a required capacity from Elasticsearch for this policy.
			log.Info(
				"Required capacity for policy",
				"policy", autoscalingPolicy.Name,
				"required_capacity", autoscalingPolicyResult.RequiredCapacity,
				"current_capacity", autoscalingPolicyResult.CurrentCapacity,
				"current_capacity.count", len(autoscalingPolicyResult.CurrentNodes),
				"current_nodes", autoscalingPolicyResult.CurrentNodes,
			)
			ctx, err := autoscaler.NewContext(
				log,
				autoscalingPolicy,
				nodeSetList,
				currentAutoscalingStatus,
				autoscalingPolicyResult,
				statusBuilder,
			)
			if err != nil {
				log.Error(err, "Error while creating autoscaling context for policy", "policy", autoscalingPolicy.Name)
				continue
			}
			nodeSetsResources = ctx.GetResources()
		} else {
			// We didn't receive a required capacity for this tier, or the response is empty. We can only ensure that resources are within the allowed ranges.
			log.V(1).Info(
				"No required capacity received from Elasticsearch, ensure resources limits are respected",
				"policy", autoscalingPolicy.Name,
			)
			statusBuilder.ForPolicy(autoscalingPolicy.Name).RecordEvent(v1alpha1.EmptyResponse, "No required capacity from Elasticsearch")
			nodeSetsResources = autoscaler.GetOfflineNodeSetsResources(log, nodeSetList.Names(), autoscalingPolicy, currentAutoscalingStatus)
		}
		// Add the result to the list of the next resources
		nextClusterResources = append(nextClusterResources, nodeSetsResources)
	}

	// Emit the K8S events
	status.EmitEvents(es, r.recorder, statusBuilder.Build())

	// Update the Elasticsearch resource with the calculated resources.
	if err := reconcileElasticsearch(log, &es, nextClusterResources); err != nil {
		errors = append(errors, err)
	}
	if len(errors) > 0 {
		return nil, tracing.CaptureError(ctx, k8serrors.NewAggregate(errors))
	}

	// Register new resources in the status
	statusBuilder.UpdateResources(nextClusterResources, currentAutoscalingStatus)

	return &es, nil
}