in pkg/controller/elasticsearch/driver/nodes.go [35:243]
func (d *defaultDriver) reconcileNodeSpecs(
ctx context.Context,
esReachable bool,
esClient esclient.Client,
reconcileState *reconcile.State,
resourcesState reconcile.ResourcesState,
keystoreResources *keystore.Resources,
) *reconciler.Results {
span, ctx := apm.StartSpan(ctx, "reconcile_node_spec", tracing.SpanTypeApp)
defer span.End()
log := ulog.FromContext(ctx)
results := &reconciler.Results{}
// If some nodeSets are managed by the autoscaler, wait for them to be updated.
if ok, err := d.autoscaledResourcesSynced(ctx, d.ES); err != nil {
return results.WithError(fmt.Errorf("StatefulSet recreation: %w", err))
} else if !ok {
return results.WithReconciliationState(defaultRequeue.WithReason("Waiting for autoscaling controller to sync node sets"))
}
// check if actual StatefulSets and corresponding pods match our expectations before applying any change
ok, reason, err := d.expectationsSatisfied(ctx)
if err != nil {
return results.WithError(err)
}
if !ok {
return results.WithReconciliationState(defaultRequeue.WithReason(reason))
}
// recreate any StatefulSet that needs to account for PVC expansion
recreations, err := recreateStatefulSets(ctx, d.K8sClient(), d.ES)
if err != nil {
return results.WithError(fmt.Errorf("StatefulSet recreation: %w", err))
}
if recreations > 0 {
// Some StatefulSets are in the process of being recreated to handle PVC expansion:
// it is safer to requeue until the re-creation is done.
// Otherwise, some operation could be performed with wrong assumptions:
// the sset doesn't exist (was just deleted), but the Pods do actually exist.
log.V(1).Info("StatefulSets recreation in progress, re-queueing.",
"namespace", d.ES.Namespace, "es_name", d.ES.Name, "recreations", recreations)
return results.WithReconciliationState(defaultRequeue.WithReason("StatefulSets recreation in progress"))
}
actualStatefulSets, err := es_sset.RetrieveActualStatefulSets(d.Client, k8s.ExtractNamespacedName(&d.ES))
if err != nil {
return results.WithError(err)
}
expectedResources, err := nodespec.BuildExpectedResources(ctx, d.Client, d.ES, keystoreResources, actualStatefulSets, d.OperatorParameters.IPFamily, d.OperatorParameters.SetDefaultSecurityContext)
if err != nil {
return results.WithError(err)
}
if esClient.IsDesiredNodesSupported() {
results.WithResults(d.updateDesiredNodes(ctx, esClient, esReachable, expectedResources))
if results.HasError() {
return results
}
}
esState := NewMemoizingESState(ctx, esClient)
// Phase 1: apply expected StatefulSets resources and scale up.
upscaleCtx := upscaleCtx{
parentCtx: ctx,
k8sClient: d.K8sClient(),
es: d.ES,
esState: esState,
expectations: d.Expectations,
validateStorageClass: d.OperatorParameters.ValidateStorageClass,
upscaleReporter: reconcileState.UpscaleReporter,
}
upscaleResults, err := HandleUpscaleAndSpecChanges(upscaleCtx, actualStatefulSets, expectedResources)
if err != nil {
reconcileState.AddEvent(corev1.EventTypeWarning, events.EventReconciliationError, fmt.Sprintf("Failed to apply spec change: %v", err))
var podTemplateErr *sset.PodTemplateError
if errors.As(err, &podTemplateErr) {
// An error has been detected in one of the pod templates, let's update the phase to "invalid"
reconcileState.UpdateElasticsearchInvalidWithEvent(err.Error())
}
return results.WithError(err)
}
if upscaleResults.Requeue {
return results.WithReconciliationState(defaultRequeue.WithReason("StatefulSet is scheduled for recreation"))
}
if reconcileState.HasPendingNewNodes() {
results.WithReconciliationState(defaultRequeue.WithReason("Upscale in progress"))
}
actualStatefulSets = upscaleResults.ActualStatefulSets
// Once all the StatefulSets have been updated we can ensure that the former version of the transport certificates Secret is deleted.
if err := transport.DeleteLegacyTransportCertificate(ctx, d.Client, d.ES); err != nil {
results.WithError(err)
}
// Update PDB to account for new replicas.
if err := pdb.Reconcile(ctx, d.Client, d.ES, actualStatefulSets); err != nil {
return results.WithError(err)
}
if err := reconcilePVCOwnerRefs(ctx, d.K8sClient(), d.ES); err != nil {
return results.WithError(err)
}
if err := GarbageCollectPVCs(ctx, d.K8sClient(), d.ES, actualStatefulSets, expectedResources.StatefulSets()); err != nil {
return results.WithError(err)
}
// Phase 2: if there is any Pending or bootlooping Pod to upgrade, do it.
attempted, err := d.MaybeForceUpgrade(ctx, actualStatefulSets)
if err != nil || attempted {
// If attempted, we're in a transient state where it's safer to requeue.
// We don't want to re-upgrade in a regular way the pods we just force-upgraded.
// Next reconciliation will check expectations again.
reconcileState.UpdateWithPhase(esv1.ElasticsearchApplyingChangesPhase)
return results.WithError(err)
}
// Next operations require the Elasticsearch API to be available.
if !esReachable {
msg := "Elasticsearch cannot be reached yet, re-queuing"
log.Info(msg, "namespace", d.ES.Namespace, "es_name", d.ES.Name)
reconcileState.UpdateWithPhase(esv1.ElasticsearchApplyingChangesPhase)
return results.WithReconciliationState(defaultRequeue.WithReason(msg))
}
// Maybe update Zen1 minimum master nodes through the API, corresponding to the current nodes we have.
requeue, err := zen1.UpdateMinimumMasterNodes(ctx, d.Client, d.ES, esClient, actualStatefulSets)
if err != nil {
return results.WithError(err)
}
if requeue {
results.WithReconciliationState(defaultRequeue.WithReason("Not enough available masters to update Zen1 settings"))
}
// Remove the zen2 bootstrap annotation if bootstrap is over.
requeue, err = zen2.RemoveZen2BootstrapAnnotation(ctx, d.Client, d.ES, esClient)
if err != nil {
return results.WithError(err)
}
if requeue {
results.WithReconciliationState(defaultRequeue.WithReason("Initial cluster bootstrap is not complete"))
}
// Maybe clear zen2 voting config exclusions.
requeue, err = zen2.ClearVotingConfigExclusions(ctx, d.ES, d.Client, esClient, actualStatefulSets)
if err != nil {
return results.WithError(fmt.Errorf("when clearing voting exclusions: %w", err))
}
if requeue {
results.WithReconciliationState(defaultRequeue.WithReason("Cannot clear voting exclusions yet"))
}
// shutdown logic is dependent on Elasticsearch version
nodeShutdowns, err := newShutdownInterface(ctx, d.ES, esClient, esState, reconcileState.StatusReporter)
if err != nil {
return results.WithError(err)
}
// Phase 2: handle sset scale down.
// We want to safely remove nodes from the cluster, either because the sset requires less replicas,
// or because it should be removed entirely.
downscaleCtx := newDownscaleContext(
ctx,
d.Client,
esClient,
resourcesState,
reconcileState,
d.Expectations,
d.ES,
nodeShutdowns,
)
downscaleRes := HandleDownscale(downscaleCtx, expectedResources.StatefulSets(), actualStatefulSets)
results.WithResults(downscaleRes)
if downscaleRes.HasError() {
return results
}
// Phase 3: handle rolling upgrades.
rollingUpgradesRes := d.handleUpgrades(ctx, esClient, esState, expectedResources)
results.WithResults(rollingUpgradesRes)
if rollingUpgradesRes.HasError() {
return results
}
isNodeSpecsReconciled := d.isNodeSpecsReconciled(ctx, actualStatefulSets, d.Client, results)
// as of 7.15.2 with node shutdown we do not need transient settings anymore and in fact want to remove any left-overs.
if esReachable && isNodeSpecsReconciled {
if err := d.maybeRemoveTransientSettings(ctx, esClient); err != nil {
return results.WithError(err)
}
}
// Set or update an orchestration hint to let the association controller know of service account are supported.
if isNodeSpecsReconciled {
allNodesRunningServiceAccounts, err := esv1.AreServiceAccountsSupported(d.ES.Spec.Version)
if err != nil {
return results.WithError(err)
}
// All the nodes are now running with a reconciled node specification. Consequently, we know that all the nodes have now a symbolic
// link in the Elasticsearch configuration directory that allows them to read the service account tokens generated by the operator.
// Depending on the Elasticsearch version we can surface the capability of the operator to handle (or not) service account tokens.
d.ReconcileState.UpdateOrchestrationHints(
d.ReconcileState.OrchestrationHints().Merge(hints.OrchestrationsHints{ServiceAccounts: optional.NewBool(allNodesRunningServiceAccounts)}),
)
}
return results
}