func()

in pkg/controller/elasticsearch/driver/driver.go [128:405]


func (d *defaultDriver) Reconcile(ctx context.Context) *reconciler.Results {
	results := reconciler.NewResult(ctx)
	log := ulog.FromContext(ctx)

	// garbage collect secrets attached to this cluster that we don't need anymore
	if err := cleanup.DeleteOrphanedSecrets(ctx, d.Client, d.ES); err != nil {
		return results.WithError(err)
	}

	if err := configmap.ReconcileScriptsConfigMap(ctx, d.Client, d.ES); err != nil {
		return results.WithError(err)
	}

	_, err := common.ReconcileService(ctx, d.Client, services.NewTransportService(d.ES), &d.ES)
	if err != nil {
		return results.WithError(err)
	}

	externalService, err := common.ReconcileService(ctx, d.Client, services.NewExternalService(d.ES), &d.ES)
	if err != nil {
		if k8serrors.IsAlreadyExists(err) {
			return results.WithReconciliationState(defaultRequeue.WithReason(fmt.Sprintf("Pending %s service recreation", services.ExternalServiceName(d.ES.Name))))
		}
		return results.WithError(err)
	}

	var internalService *corev1.Service
	internalService, err = common.ReconcileService(ctx, d.Client, services.NewInternalService(d.ES), &d.ES)
	if err != nil {
		return results.WithError(err)
	}

	// Remote Cluster Server (RCS2) Kubernetes Service reconciliation.
	if d.ES.Spec.RemoteClusterServer.Enabled {
		// Remote Cluster Server is enabled, ensure that the related Kubernetes Service does exist.
		if _, err := common.ReconcileService(ctx, d.Client, services.NewRemoteClusterService(d.ES), &d.ES); err != nil {
			results.WithError(err)
		}
	} else {
		// Ensure that remote cluster Service does not exist.
		remoteClusterService := &corev1.Service{
			ObjectMeta: metav1.ObjectMeta{
				Namespace: d.ES.Namespace,
				Name:      services.RemoteClusterServiceName(d.ES.Name),
			},
		}
		results.WithError(k8s.DeleteResourceIfExists(ctx, d.Client, remoteClusterService))
	}

	resourcesState, err := reconcile.NewResourcesStateFromAPI(d.Client, d.ES)
	if err != nil {
		return results.WithError(err)
	}

	warnUnsupportedDistro(resourcesState.AllPods, d.ReconcileState.Recorder)

	controllerUser, err := user.ReconcileUsersAndRoles(ctx, d.Client, d.ES, d.DynamicWatches(), d.Recorder(), d.OperatorParameters.PasswordHasher)
	if err != nil {
		return results.WithError(err)
	}

	trustedHTTPCertificates, res := certificates.ReconcileHTTP(
		ctx,
		d,
		d.ES,
		[]corev1.Service{*externalService, *internalService},
		d.OperatorParameters.GlobalCA,
		d.OperatorParameters.CACertRotation,
		d.OperatorParameters.CertRotation,
	)
	results.WithResults(res)
	if res != nil && res.HasError() {
		return results
	}

	// start the ES observer
	minVersion, err := version.MinInPods(resourcesState.CurrentPods, label.VersionLabelName)
	if err != nil {
		return results.WithError(err)
	}
	if minVersion == nil {
		minVersion = &d.Version
	}

	urlProvider := services.NewElasticsearchURLProvider(d.ES, d.Client)
	hasEndpoints := urlProvider.HasEndpoints()

	observedState := d.Observers.ObservedStateResolver(
		ctx,
		d.ES,
		d.elasticsearchClientProvider(
			ctx,
			urlProvider,
			controllerUser,
			*minVersion,
			trustedHTTPCertificates,
		),
		hasEndpoints,
	)

	// Always update the Elasticsearch state bits with the latest observed state.
	d.ReconcileState.
		UpdateClusterHealth(observedState()).         // Elasticsearch cluster health
		UpdateAvailableNodes(*resourcesState).        // Available nodes
		UpdateMinRunningVersion(ctx, *resourcesState) // Min running version

	res = certificates.ReconcileTransport(
		ctx,
		d,
		d.ES,
		d.OperatorParameters.GlobalCA,
		d.OperatorParameters.CACertRotation,
		d.OperatorParameters.CertRotation,
	)
	results.WithResults(res)
	if res != nil && res.HasError() {
		return results
	}

	// Patch the Pods to add the expected node labels as annotations. Record the error, if any, but do not stop the
	// reconciliation loop as we don't want to prevent other updates from being applied to the cluster.
	results.WithResults(annotatePodsWithNodeLabels(ctx, d.Client, d.ES))

	if err := d.verifySupportsExistingPods(resourcesState.CurrentPods); err != nil {
		if !d.ES.IsConfiguredToAllowDowngrades() {
			return results.WithError(err)
		}
		log.Info("Allowing downgrade on user request", "warning", err.Error())
	}

	// TODO: support user-supplied certificate (non-ca)
	esClient := d.newElasticsearchClient(
		ctx,
		urlProvider,
		controllerUser,
		*minVersion,
		trustedHTTPCertificates,
	)
	defer esClient.Close()

	// use unknown health as a proxy for a cluster not responding to requests
	hasKnownHealthState := observedState() != esv1.ElasticsearchUnknownHealth
	esReachable := hasEndpoints && hasKnownHealthState
	// report condition in Pod status
	if esReachable {
		d.ReconcileState.ReportCondition(esv1.ElasticsearchIsReachable, corev1.ConditionTrue, esReachableConditionMessage(internalService, hasEndpoints, hasKnownHealthState))
	} else {
		d.ReconcileState.ReportCondition(esv1.ElasticsearchIsReachable, corev1.ConditionFalse, esReachableConditionMessage(internalService, hasEndpoints, hasKnownHealthState))
	}

	var currentLicense esclient.License
	if esReachable {
		currentLicense, err = license.CheckElasticsearchLicense(ctx, esClient)
		var e *license.GetLicenseError
		if errors.As(err, &e) {
			if !e.SupportedDistribution {
				msg := "Unsupported Elasticsearch distribution"
				// unsupported distribution, let's update the phase to "invalid" and stop the reconciliation
				d.ReconcileState.
					UpdateWithPhase(esv1.ElasticsearchResourceInvalid).
					AddEvent(corev1.EventTypeWarning, events.EventReasonUnexpected, fmt.Sprintf("%s: %s", msg, err.Error()))
				return results.WithError(errors.Wrap(err, strings.ToLower(msg[0:1])+msg[1:]))
			}
			// update esReachable to bypass steps that requires ES up in order to not block reconciliation for long periods
			esReachable = e.EsReachable
		}
		if err != nil {
			msg := "Could not verify license, re-queuing"
			log.Info(msg, "err", err, "namespace", d.ES.Namespace, "es_name", d.ES.Name)
			d.ReconcileState.AddEvent(corev1.EventTypeWarning, events.EventReasonUnexpected, fmt.Sprintf("%s: %s", msg, err.Error()))
			results.WithReconciliationState(defaultRequeue.WithReason(msg))
		}
	}

	// Update the service account orchestration hint. This is done early in the reconciliation loop to unblock association
	// controllers that may be waiting for the orchestration hint.
	results.WithError(d.maybeSetServiceAccountsOrchestrationHint(ctx, esReachable, esClient, resourcesState))

	// reconcile the Elasticsearch license (even if we assume the cluster might not respond to requests to cover the case of
	// expired licenses where all health API responses are 403)
	if hasEndpoints {
		err = license.Reconcile(ctx, d.Client, d.ES, esClient, currentLicense)
		if err != nil {
			msg := "Could not reconcile cluster license, re-queuing"
			// only log an event if Elasticsearch is in a state where success of this API call can be expected. The API call itself
			// will be logged by the client
			if hasKnownHealthState {
				log.Info(msg, "err", err, "namespace", d.ES.Namespace, "es_name", d.ES.Name)
				d.ReconcileState.AddEvent(corev1.EventTypeWarning, events.EventReasonUnexpected, fmt.Sprintf("%s: %s", msg, err.Error()))
			}
			results.WithReconciliationState(defaultRequeue.WithReason(msg))
		}
	}

	// reconcile remote clusters
	if esReachable {
		requeue, err := remotecluster.UpdateSettings(ctx, d.Client, esClient, d.Recorder(), d.LicenseChecker, d.ES)
		msg := "Could not update remote clusters in Elasticsearch settings, re-queuing"
		if err != nil {
			log.Info(msg, "err", err, "namespace", d.ES.Namespace, "es_name", d.ES.Name)
			d.ReconcileState.AddEvent(corev1.EventTypeWarning, events.EventReasonUnexpected, msg)
			results.WithError(err)
		}
		if requeue {
			results.WithReconciliationState(defaultRequeue.WithReason("Updating remote cluster settings, re-queuing"))
		}
	}

	// Compute seed hosts based on current masters with a podIP
	if err := settings.UpdateSeedHostsConfigMap(ctx, d.Client, d.ES, resourcesState.AllPods); err != nil {
		return results.WithError(err)
	}

	// reconcile an empty File based settings Secret if it doesn't exist
	if d.Version.GTE(filesettings.FileBasedSettingsMinPreVersion) {
		err = filesettings.ReconcileEmptyFileSettingsSecret(ctx, d.Client, d.ES, true)
		if err != nil {
			return results.WithError(err)
		}
	}

	keystoreParams := initcontainer.KeystoreParams
	keystoreSecurityContext := securitycontext.For(d.Version, true)
	keystoreParams.SecurityContext = &keystoreSecurityContext

	// Set up a keystore with secure settings in an init container, if specified by the user.
	// We are also using the keystore internally for the remote cluster API keys.
	remoteClusterAPIKeys, err := apiKeyStoreSecretSource(ctx, &d.ES, d.Client)
	if err != nil {
		return results.WithError(err)
	}
	keystoreResources, err := keystore.ReconcileResources(
		ctx,
		d,
		&d.ES,
		esv1.ESNamer,
		label.NewLabels(k8s.ExtractNamespacedName(&d.ES)),
		keystoreParams,
		remoteClusterAPIKeys...,
	)
	if err != nil {
		return results.WithError(err)
	}

	// set an annotation with the ClusterUUID, if bootstrapped
	requeue, err := bootstrap.ReconcileClusterUUID(ctx, d.Client, &d.ES, esClient, esReachable)
	if err != nil {
		return results.WithError(err)
	}
	if requeue {
		results = results.WithReconciliationState(defaultRequeue.WithReason("Elasticsearch cluster UUID is not reconciled"))
	}

	// reconcile beats config secrets if Stack Monitoring is defined
	err = stackmon.ReconcileConfigSecrets(ctx, d.Client, d.ES)
	if err != nil {
		return results.WithError(err)
	}

	// requeue if associations are defined but not yet configured, otherwise we may be in a situation where we deploy
	// Elasticsearch Pods once, then change their spec a few seconds later once the association is configured
	areAssocsConfigured, err := association.AreConfiguredIfSet(ctx, d.ES.GetAssociations(), d.Recorder())
	if err != nil {
		return results.WithError(err)
	}
	if !areAssocsConfigured {
		results.WithReconciliationState(defaultRequeue.WithReason("Some associations are not reconciled"))
	}

	// we want to reconcile suspended Pods before we start reconciling node specs as this is considered a debugging and
	// troubleshooting tool that does not follow the change budget restrictions
	if err := reconcileSuspendedPods(ctx, d.Client, d.ES, d.Expectations); err != nil {
		return results.WithError(err)
	}

	// reconcile StatefulSets and nodes configuration
	return results.WithResults(d.reconcileNodeSpecs(ctx, esReachable, esClient, d.ReconcileState, *resourcesState, keystoreResources))
}