in pkg/controller/elasticsearch/driver/desired_nodes.go [25:96]
func (d *defaultDriver) updateDesiredNodes(
ctx context.Context,
esClient esclient.Client,
esReachable bool,
expectedResources nodespec.ResourcesList,
) *reconciler.Results {
span, ctx := apm.StartSpan(ctx, "update_desired_nodes", tracing.SpanTypeApp)
defer span.End()
results := &reconciler.Results{}
// We compute the desired nodes state to update the condition
var resourceNotAvailableErr *nodespec.ResourceNotAvailable
esVersion, err := version.Parse(d.ES.Spec.Version)
if err != nil {
return results.WithError(err)
}
nodes, requeue, err := expectedResources.ToDesiredNodes(ctx, d.Client, esVersion.FinalizeVersion())
switch {
case err == nil:
d.ReconcileState.ReportCondition(
esv1.ResourcesAwareManagement,
corev1.ConditionTrue,
fmt.Sprintf("Successfully calculated compute and storage resources from Elasticsearch resource generation %d", d.ES.Generation),
)
case errors.As(err, &resourceNotAvailableErr):
// It is not possible to build the desired node spec because of the Elasticsearch specification
d.ReconcileState.ReportCondition(
esv1.ResourcesAwareManagement,
corev1.ConditionFalse,
fmt.Sprintf("Cannot get compute and storage resources from Elasticsearch resource generation %d: %s", d.ES.Generation, err.Error()),
)
// It is fine to continue, error is only reported through the condition.
// We should however clear the desired nodes API since we are in a degraded (not resources aware) mode.
if esReachable {
return results.WithError(esClient.DeleteDesiredNodes(ctx))
}
return results.WithReconciliationState(defaultRequeue.WithReason("Desired nodes API must be cleared"))
default:
// Unknown error: not nil and not ResourceNotAvailable
d.ReconcileState.ReportCondition(
esv1.ResourcesAwareManagement,
corev1.ConditionUnknown,
fmt.Sprintf("Error while calculating compute and storage resources from Elasticsearch resource generation %d: %s", d.ES.Generation, err.Error()),
)
return results.WithError(err)
}
if requeue {
results.WithReconciliationState(defaultRequeue.WithReason("Storage capacity is not available in all PVC statuses, requeue to refine the capacity reported in the desired nodes API"))
}
if esReachable {
latestDesiredNodes, err := esClient.GetLatestDesiredNodes(ctx)
if err != nil && !esclient.IsNotFound(err) {
// ignore 404 but error out on anything else
return results.WithError(err)
}
nodesHash := hash.HashObject(nodes)
if d.ReconcileState.OrchestrationHints().DesiredNodes.Equals(latestDesiredNodes.Version, nodesHash) {
return results
}
nextVersion := latestDesiredNodes.Version + 1
err = esClient.UpdateDesiredNodes(ctx, string(d.ES.UID), nextVersion, esclient.DesiredNodes{DesiredNodes: nodes})
if err == nil {
d.ReconcileState.UpdateOrchestrationHints(hints.OrchestrationsHints{DesiredNodes: &hints.DesiredNodesHint{
Version: nextVersion,
Hash: nodesHash,
}})
}
return results.WithError(err)
}
return results.WithReconciliationState(defaultRequeue.WithReason("Waiting for Elasticsearch to be available to update the desired nodes API"))
}