in pkg/controller/autoscaling/elasticsearch/driver.go [98:213]
func (r *baseReconcileAutoscaling) attemptOnlineReconciliation(
ctx context.Context,
es esv1.Elasticsearch,
statusBuilder *v1alpha1.AutoscalingStatusBuilder,
autoscaledNodeSets esv1.AutoscaledNodeSets,
autoscalingResource v1alpha1.AutoscalingResource,
) (*esv1.Elasticsearch, error) {
span, ctx := apm.StartSpan(ctx, "online_reconciliation", tracing.SpanTypeApp)
defer span.End()
autoscalingSpec, err := autoscalingResource.GetAutoscalingPolicySpecs()
if err != nil {
return nil, err
}
log := logconf.FromContext(ctx)
log.V(1).Info("Starting online autoscaling reconciliation")
esClient, err := r.esClientProvider(ctx, r.Client, r.Dialer, es)
if err != nil {
return nil, err
}
defer esClient.Close()
// Update Machine Learning settings
mlNodes, maxMemory := esv1.GetMLNodesSettings(autoscalingSpec)
if err := esClient.UpdateMLNodesSettings(ctx, mlNodes, maxMemory); err != nil {
log.Error(err, "Error while updating the ML settings")
return nil, err
}
// Update autoscaling policies in Elasticsearch
if err := updatePolicies(ctx, log, autoscalingResource, esClient); err != nil {
log.Error(err, "Error while updating the autoscaling policies")
return nil, err
}
// Get capacity requirements from the Elasticsearch autoscaling capacity API
autoscalingCapacityResult, err := esClient.GetAutoscalingCapacity(ctx)
if err != nil {
return nil, err
}
// nextClusterResources holds the resources computed by the autoscaling algorithm for each nodeSet.
var nextClusterResources v1alpha1.ClusterResources
currentAutoscalingStatus, err := autoscalingResource.GetElasticsearchAutoscalerStatus()
if err != nil {
return nil, err
}
var errors []error
// For each autoscaling policy we compute the resources to be applied to the related nodeSets.
for _, autoscalingPolicy := range autoscalingSpec {
// Get the currentNodeSets
nodeSetList, exists := autoscaledNodeSets[autoscalingPolicy.Name]
if !exists {
// This situation should be caught during the validation, we still want to trace this error if it happens.
err := fmt.Errorf("no nodeSets for tier %s", autoscalingPolicy.Name)
log.Error(err, "no nodeSet for a tier", "policy", autoscalingPolicy.Name)
errors = append(errors, err)
statusBuilder.ForPolicy(autoscalingPolicy.Name).RecordEvent(v1alpha1.NoNodeSet, err.Error())
continue
}
// Get the required capacity for this autoscaling policy from the Elasticsearch API
var nodeSetsResources v1alpha1.NodeSetsResources
autoscalingPolicyResult, hasCapacity := autoscalingCapacityResult.Policies[autoscalingPolicy.Name]
if hasCapacity && !autoscalingPolicyResult.RequiredCapacity.IsEmpty() {
// We received a required capacity from Elasticsearch for this policy.
log.Info(
"Required capacity for policy",
"policy", autoscalingPolicy.Name,
"required_capacity", autoscalingPolicyResult.RequiredCapacity,
"current_capacity", autoscalingPolicyResult.CurrentCapacity,
"current_capacity.count", len(autoscalingPolicyResult.CurrentNodes),
"current_nodes", autoscalingPolicyResult.CurrentNodes,
)
ctx, err := autoscaler.NewContext(
log,
autoscalingPolicy,
nodeSetList,
currentAutoscalingStatus,
autoscalingPolicyResult,
statusBuilder,
)
if err != nil {
log.Error(err, "Error while creating autoscaling context for policy", "policy", autoscalingPolicy.Name)
continue
}
nodeSetsResources = ctx.GetResources()
} else {
// We didn't receive a required capacity for this tier, or the response is empty. We can only ensure that resources are within the allowed ranges.
log.V(1).Info(
"No required capacity received from Elasticsearch, ensure resources limits are respected",
"policy", autoscalingPolicy.Name,
)
statusBuilder.ForPolicy(autoscalingPolicy.Name).RecordEvent(v1alpha1.EmptyResponse, "No required capacity from Elasticsearch")
nodeSetsResources = autoscaler.GetOfflineNodeSetsResources(log, nodeSetList.Names(), autoscalingPolicy, currentAutoscalingStatus)
}
// Add the result to the list of the next resources
nextClusterResources = append(nextClusterResources, nodeSetsResources)
}
// Emit the K8S events
status.EmitEvents(es, r.recorder, statusBuilder.Build())
// Update the Elasticsearch resource with the calculated resources.
if err := reconcileElasticsearch(log, &es, nextClusterResources); err != nil {
errors = append(errors, err)
}
if len(errors) > 0 {
return nil, tracing.CaptureError(ctx, k8serrors.NewAggregate(errors))
}
// Register new resources in the status
statusBuilder.UpdateResources(nextClusterResources, currentAutoscalingStatus)
return &es, nil
}