func()

in pkg/controller/elasticsearch/driver/nodes.go [35:243]


func (d *defaultDriver) reconcileNodeSpecs(
	ctx context.Context,
	esReachable bool,
	esClient esclient.Client,
	reconcileState *reconcile.State,
	resourcesState reconcile.ResourcesState,
	keystoreResources *keystore.Resources,
) *reconciler.Results {
	span, ctx := apm.StartSpan(ctx, "reconcile_node_spec", tracing.SpanTypeApp)
	defer span.End()
	log := ulog.FromContext(ctx)

	results := &reconciler.Results{}

	// If some nodeSets are managed by the autoscaler, wait for them to be updated.
	if ok, err := d.autoscaledResourcesSynced(ctx, d.ES); err != nil {
		return results.WithError(fmt.Errorf("StatefulSet recreation: %w", err))
	} else if !ok {
		return results.WithReconciliationState(defaultRequeue.WithReason("Waiting for autoscaling controller to sync node sets"))
	}

	// check if actual StatefulSets and corresponding pods match our expectations before applying any change
	ok, reason, err := d.expectationsSatisfied(ctx)
	if err != nil {
		return results.WithError(err)
	}
	if !ok {
		return results.WithReconciliationState(defaultRequeue.WithReason(reason))
	}

	// recreate any StatefulSet that needs to account for PVC expansion
	recreations, err := recreateStatefulSets(ctx, d.K8sClient(), d.ES)
	if err != nil {
		return results.WithError(fmt.Errorf("StatefulSet recreation: %w", err))
	}
	if recreations > 0 {
		// Some StatefulSets are in the process of being recreated to handle PVC expansion:
		// it is safer to requeue until the re-creation is done.
		// Otherwise, some operation could be performed with wrong assumptions:
		// the sset doesn't exist (was just deleted), but the Pods do actually exist.
		log.V(1).Info("StatefulSets recreation in progress, re-queueing.",
			"namespace", d.ES.Namespace, "es_name", d.ES.Name, "recreations", recreations)
		return results.WithReconciliationState(defaultRequeue.WithReason("StatefulSets recreation in progress"))
	}

	actualStatefulSets, err := es_sset.RetrieveActualStatefulSets(d.Client, k8s.ExtractNamespacedName(&d.ES))
	if err != nil {
		return results.WithError(err)
	}

	expectedResources, err := nodespec.BuildExpectedResources(ctx, d.Client, d.ES, keystoreResources, actualStatefulSets, d.OperatorParameters.IPFamily, d.OperatorParameters.SetDefaultSecurityContext)
	if err != nil {
		return results.WithError(err)
	}

	if esClient.IsDesiredNodesSupported() {
		results.WithResults(d.updateDesiredNodes(ctx, esClient, esReachable, expectedResources))
		if results.HasError() {
			return results
		}
	}

	esState := NewMemoizingESState(ctx, esClient)
	// Phase 1: apply expected StatefulSets resources and scale up.
	upscaleCtx := upscaleCtx{
		parentCtx:            ctx,
		k8sClient:            d.K8sClient(),
		es:                   d.ES,
		esState:              esState,
		expectations:         d.Expectations,
		validateStorageClass: d.OperatorParameters.ValidateStorageClass,
		upscaleReporter:      reconcileState.UpscaleReporter,
	}
	upscaleResults, err := HandleUpscaleAndSpecChanges(upscaleCtx, actualStatefulSets, expectedResources)
	if err != nil {
		reconcileState.AddEvent(corev1.EventTypeWarning, events.EventReconciliationError, fmt.Sprintf("Failed to apply spec change: %v", err))
		var podTemplateErr *sset.PodTemplateError
		if errors.As(err, &podTemplateErr) {
			// An error has been detected in one of the pod templates, let's update the phase to "invalid"
			reconcileState.UpdateElasticsearchInvalidWithEvent(err.Error())
		}
		return results.WithError(err)
	}

	if upscaleResults.Requeue {
		return results.WithReconciliationState(defaultRequeue.WithReason("StatefulSet is scheduled for recreation"))
	}
	if reconcileState.HasPendingNewNodes() {
		results.WithReconciliationState(defaultRequeue.WithReason("Upscale in progress"))
	}
	actualStatefulSets = upscaleResults.ActualStatefulSets

	// Once all the StatefulSets have been updated we can ensure that the former version of the transport certificates Secret is deleted.
	if err := transport.DeleteLegacyTransportCertificate(ctx, d.Client, d.ES); err != nil {
		results.WithError(err)
	}

	// Update PDB to account for new replicas.
	if err := pdb.Reconcile(ctx, d.Client, d.ES, actualStatefulSets); err != nil {
		return results.WithError(err)
	}

	if err := reconcilePVCOwnerRefs(ctx, d.K8sClient(), d.ES); err != nil {
		return results.WithError(err)
	}

	if err := GarbageCollectPVCs(ctx, d.K8sClient(), d.ES, actualStatefulSets, expectedResources.StatefulSets()); err != nil {
		return results.WithError(err)
	}

	// Phase 2: if there is any Pending or bootlooping Pod to upgrade, do it.
	attempted, err := d.MaybeForceUpgrade(ctx, actualStatefulSets)
	if err != nil || attempted {
		// If attempted, we're in a transient state where it's safer to requeue.
		// We don't want to re-upgrade in a regular way the pods we just force-upgraded.
		// Next reconciliation will check expectations again.
		reconcileState.UpdateWithPhase(esv1.ElasticsearchApplyingChangesPhase)
		return results.WithError(err)
	}

	// Next operations require the Elasticsearch API to be available.
	if !esReachable {
		msg := "Elasticsearch cannot be reached yet, re-queuing"
		log.Info(msg, "namespace", d.ES.Namespace, "es_name", d.ES.Name)
		reconcileState.UpdateWithPhase(esv1.ElasticsearchApplyingChangesPhase)
		return results.WithReconciliationState(defaultRequeue.WithReason(msg))
	}

	// Maybe update Zen1 minimum master nodes through the API, corresponding to the current nodes we have.
	requeue, err := zen1.UpdateMinimumMasterNodes(ctx, d.Client, d.ES, esClient, actualStatefulSets)
	if err != nil {
		return results.WithError(err)
	}
	if requeue {
		results.WithReconciliationState(defaultRequeue.WithReason("Not enough available masters to update Zen1 settings"))
	}
	// Remove the zen2 bootstrap annotation if bootstrap is over.
	requeue, err = zen2.RemoveZen2BootstrapAnnotation(ctx, d.Client, d.ES, esClient)
	if err != nil {
		return results.WithError(err)
	}
	if requeue {
		results.WithReconciliationState(defaultRequeue.WithReason("Initial cluster bootstrap is not complete"))
	}
	// Maybe clear zen2 voting config exclusions.
	requeue, err = zen2.ClearVotingConfigExclusions(ctx, d.ES, d.Client, esClient, actualStatefulSets)
	if err != nil {
		return results.WithError(fmt.Errorf("when clearing voting exclusions: %w", err))
	}
	if requeue {
		results.WithReconciliationState(defaultRequeue.WithReason("Cannot clear voting exclusions yet"))
	}
	// shutdown logic is dependent on Elasticsearch version
	nodeShutdowns, err := newShutdownInterface(ctx, d.ES, esClient, esState, reconcileState.StatusReporter)
	if err != nil {
		return results.WithError(err)
	}

	// Phase 2: handle sset scale down.
	// We want to safely remove nodes from the cluster, either because the sset requires less replicas,
	// or because it should be removed entirely.
	downscaleCtx := newDownscaleContext(
		ctx,
		d.Client,
		esClient,
		resourcesState,
		reconcileState,
		d.Expectations,
		d.ES,
		nodeShutdowns,
	)

	downscaleRes := HandleDownscale(downscaleCtx, expectedResources.StatefulSets(), actualStatefulSets)
	results.WithResults(downscaleRes)
	if downscaleRes.HasError() {
		return results
	}

	// Phase 3: handle rolling upgrades.
	rollingUpgradesRes := d.handleUpgrades(ctx, esClient, esState, expectedResources)
	results.WithResults(rollingUpgradesRes)
	if rollingUpgradesRes.HasError() {
		return results
	}

	isNodeSpecsReconciled := d.isNodeSpecsReconciled(ctx, actualStatefulSets, d.Client, results)
	// as of 7.15.2 with node shutdown we do not need transient settings anymore and in fact want to remove any left-overs.
	if esReachable && isNodeSpecsReconciled {
		if err := d.maybeRemoveTransientSettings(ctx, esClient); err != nil {
			return results.WithError(err)
		}
	}

	// Set or update an orchestration hint to let the association controller know of service account are supported.
	if isNodeSpecsReconciled {
		allNodesRunningServiceAccounts, err := esv1.AreServiceAccountsSupported(d.ES.Spec.Version)
		if err != nil {
			return results.WithError(err)
		}
		// All the nodes are now running with a reconciled node specification. Consequently, we know that all the nodes have now a symbolic
		// link in the Elasticsearch configuration directory that allows them to read the service account tokens generated by the operator.
		// Depending on the Elasticsearch version we can surface the capability of the operator to handle (or not) service account tokens.
		d.ReconcileState.UpdateOrchestrationHints(
			d.ReconcileState.OrchestrationHints().Merge(hints.OrchestrationsHints{ServiceAccounts: optional.NewBool(allNodesRunningServiceAccounts)}),
		)
	}

	return results
}