in pkg/controller/elasticsearch/driver/driver.go [128:405]
func (d *defaultDriver) Reconcile(ctx context.Context) *reconciler.Results {
results := reconciler.NewResult(ctx)
log := ulog.FromContext(ctx)
// garbage collect secrets attached to this cluster that we don't need anymore
if err := cleanup.DeleteOrphanedSecrets(ctx, d.Client, d.ES); err != nil {
return results.WithError(err)
}
if err := configmap.ReconcileScriptsConfigMap(ctx, d.Client, d.ES); err != nil {
return results.WithError(err)
}
_, err := common.ReconcileService(ctx, d.Client, services.NewTransportService(d.ES), &d.ES)
if err != nil {
return results.WithError(err)
}
externalService, err := common.ReconcileService(ctx, d.Client, services.NewExternalService(d.ES), &d.ES)
if err != nil {
if k8serrors.IsAlreadyExists(err) {
return results.WithReconciliationState(defaultRequeue.WithReason(fmt.Sprintf("Pending %s service recreation", services.ExternalServiceName(d.ES.Name))))
}
return results.WithError(err)
}
var internalService *corev1.Service
internalService, err = common.ReconcileService(ctx, d.Client, services.NewInternalService(d.ES), &d.ES)
if err != nil {
return results.WithError(err)
}
// Remote Cluster Server (RCS2) Kubernetes Service reconciliation.
if d.ES.Spec.RemoteClusterServer.Enabled {
// Remote Cluster Server is enabled, ensure that the related Kubernetes Service does exist.
if _, err := common.ReconcileService(ctx, d.Client, services.NewRemoteClusterService(d.ES), &d.ES); err != nil {
results.WithError(err)
}
} else {
// Ensure that remote cluster Service does not exist.
remoteClusterService := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: d.ES.Namespace,
Name: services.RemoteClusterServiceName(d.ES.Name),
},
}
results.WithError(k8s.DeleteResourceIfExists(ctx, d.Client, remoteClusterService))
}
resourcesState, err := reconcile.NewResourcesStateFromAPI(d.Client, d.ES)
if err != nil {
return results.WithError(err)
}
warnUnsupportedDistro(resourcesState.AllPods, d.ReconcileState.Recorder)
controllerUser, err := user.ReconcileUsersAndRoles(ctx, d.Client, d.ES, d.DynamicWatches(), d.Recorder(), d.OperatorParameters.PasswordHasher)
if err != nil {
return results.WithError(err)
}
trustedHTTPCertificates, res := certificates.ReconcileHTTP(
ctx,
d,
d.ES,
[]corev1.Service{*externalService, *internalService},
d.OperatorParameters.GlobalCA,
d.OperatorParameters.CACertRotation,
d.OperatorParameters.CertRotation,
)
results.WithResults(res)
if res != nil && res.HasError() {
return results
}
// start the ES observer
minVersion, err := version.MinInPods(resourcesState.CurrentPods, label.VersionLabelName)
if err != nil {
return results.WithError(err)
}
if minVersion == nil {
minVersion = &d.Version
}
urlProvider := services.NewElasticsearchURLProvider(d.ES, d.Client)
hasEndpoints := urlProvider.HasEndpoints()
observedState := d.Observers.ObservedStateResolver(
ctx,
d.ES,
d.elasticsearchClientProvider(
ctx,
urlProvider,
controllerUser,
*minVersion,
trustedHTTPCertificates,
),
hasEndpoints,
)
// Always update the Elasticsearch state bits with the latest observed state.
d.ReconcileState.
UpdateClusterHealth(observedState()). // Elasticsearch cluster health
UpdateAvailableNodes(*resourcesState). // Available nodes
UpdateMinRunningVersion(ctx, *resourcesState) // Min running version
res = certificates.ReconcileTransport(
ctx,
d,
d.ES,
d.OperatorParameters.GlobalCA,
d.OperatorParameters.CACertRotation,
d.OperatorParameters.CertRotation,
)
results.WithResults(res)
if res != nil && res.HasError() {
return results
}
// Patch the Pods to add the expected node labels as annotations. Record the error, if any, but do not stop the
// reconciliation loop as we don't want to prevent other updates from being applied to the cluster.
results.WithResults(annotatePodsWithNodeLabels(ctx, d.Client, d.ES))
if err := d.verifySupportsExistingPods(resourcesState.CurrentPods); err != nil {
if !d.ES.IsConfiguredToAllowDowngrades() {
return results.WithError(err)
}
log.Info("Allowing downgrade on user request", "warning", err.Error())
}
// TODO: support user-supplied certificate (non-ca)
esClient := d.newElasticsearchClient(
ctx,
urlProvider,
controllerUser,
*minVersion,
trustedHTTPCertificates,
)
defer esClient.Close()
// use unknown health as a proxy for a cluster not responding to requests
hasKnownHealthState := observedState() != esv1.ElasticsearchUnknownHealth
esReachable := hasEndpoints && hasKnownHealthState
// report condition in Pod status
if esReachable {
d.ReconcileState.ReportCondition(esv1.ElasticsearchIsReachable, corev1.ConditionTrue, esReachableConditionMessage(internalService, hasEndpoints, hasKnownHealthState))
} else {
d.ReconcileState.ReportCondition(esv1.ElasticsearchIsReachable, corev1.ConditionFalse, esReachableConditionMessage(internalService, hasEndpoints, hasKnownHealthState))
}
var currentLicense esclient.License
if esReachable {
currentLicense, err = license.CheckElasticsearchLicense(ctx, esClient)
var e *license.GetLicenseError
if errors.As(err, &e) {
if !e.SupportedDistribution {
msg := "Unsupported Elasticsearch distribution"
// unsupported distribution, let's update the phase to "invalid" and stop the reconciliation
d.ReconcileState.
UpdateWithPhase(esv1.ElasticsearchResourceInvalid).
AddEvent(corev1.EventTypeWarning, events.EventReasonUnexpected, fmt.Sprintf("%s: %s", msg, err.Error()))
return results.WithError(errors.Wrap(err, strings.ToLower(msg[0:1])+msg[1:]))
}
// update esReachable to bypass steps that requires ES up in order to not block reconciliation for long periods
esReachable = e.EsReachable
}
if err != nil {
msg := "Could not verify license, re-queuing"
log.Info(msg, "err", err, "namespace", d.ES.Namespace, "es_name", d.ES.Name)
d.ReconcileState.AddEvent(corev1.EventTypeWarning, events.EventReasonUnexpected, fmt.Sprintf("%s: %s", msg, err.Error()))
results.WithReconciliationState(defaultRequeue.WithReason(msg))
}
}
// Update the service account orchestration hint. This is done early in the reconciliation loop to unblock association
// controllers that may be waiting for the orchestration hint.
results.WithError(d.maybeSetServiceAccountsOrchestrationHint(ctx, esReachable, esClient, resourcesState))
// reconcile the Elasticsearch license (even if we assume the cluster might not respond to requests to cover the case of
// expired licenses where all health API responses are 403)
if hasEndpoints {
err = license.Reconcile(ctx, d.Client, d.ES, esClient, currentLicense)
if err != nil {
msg := "Could not reconcile cluster license, re-queuing"
// only log an event if Elasticsearch is in a state where success of this API call can be expected. The API call itself
// will be logged by the client
if hasKnownHealthState {
log.Info(msg, "err", err, "namespace", d.ES.Namespace, "es_name", d.ES.Name)
d.ReconcileState.AddEvent(corev1.EventTypeWarning, events.EventReasonUnexpected, fmt.Sprintf("%s: %s", msg, err.Error()))
}
results.WithReconciliationState(defaultRequeue.WithReason(msg))
}
}
// reconcile remote clusters
if esReachable {
requeue, err := remotecluster.UpdateSettings(ctx, d.Client, esClient, d.Recorder(), d.LicenseChecker, d.ES)
msg := "Could not update remote clusters in Elasticsearch settings, re-queuing"
if err != nil {
log.Info(msg, "err", err, "namespace", d.ES.Namespace, "es_name", d.ES.Name)
d.ReconcileState.AddEvent(corev1.EventTypeWarning, events.EventReasonUnexpected, msg)
results.WithError(err)
}
if requeue {
results.WithReconciliationState(defaultRequeue.WithReason("Updating remote cluster settings, re-queuing"))
}
}
// Compute seed hosts based on current masters with a podIP
if err := settings.UpdateSeedHostsConfigMap(ctx, d.Client, d.ES, resourcesState.AllPods); err != nil {
return results.WithError(err)
}
// reconcile an empty File based settings Secret if it doesn't exist
if d.Version.GTE(filesettings.FileBasedSettingsMinPreVersion) {
err = filesettings.ReconcileEmptyFileSettingsSecret(ctx, d.Client, d.ES, true)
if err != nil {
return results.WithError(err)
}
}
keystoreParams := initcontainer.KeystoreParams
keystoreSecurityContext := securitycontext.For(d.Version, true)
keystoreParams.SecurityContext = &keystoreSecurityContext
// Set up a keystore with secure settings in an init container, if specified by the user.
// We are also using the keystore internally for the remote cluster API keys.
remoteClusterAPIKeys, err := apiKeyStoreSecretSource(ctx, &d.ES, d.Client)
if err != nil {
return results.WithError(err)
}
keystoreResources, err := keystore.ReconcileResources(
ctx,
d,
&d.ES,
esv1.ESNamer,
label.NewLabels(k8s.ExtractNamespacedName(&d.ES)),
keystoreParams,
remoteClusterAPIKeys...,
)
if err != nil {
return results.WithError(err)
}
// set an annotation with the ClusterUUID, if bootstrapped
requeue, err := bootstrap.ReconcileClusterUUID(ctx, d.Client, &d.ES, esClient, esReachable)
if err != nil {
return results.WithError(err)
}
if requeue {
results = results.WithReconciliationState(defaultRequeue.WithReason("Elasticsearch cluster UUID is not reconciled"))
}
// reconcile beats config secrets if Stack Monitoring is defined
err = stackmon.ReconcileConfigSecrets(ctx, d.Client, d.ES)
if err != nil {
return results.WithError(err)
}
// requeue if associations are defined but not yet configured, otherwise we may be in a situation where we deploy
// Elasticsearch Pods once, then change their spec a few seconds later once the association is configured
areAssocsConfigured, err := association.AreConfiguredIfSet(ctx, d.ES.GetAssociations(), d.Recorder())
if err != nil {
return results.WithError(err)
}
if !areAssocsConfigured {
results.WithReconciliationState(defaultRequeue.WithReason("Some associations are not reconciled"))
}
// we want to reconcile suspended Pods before we start reconciling node specs as this is considered a debugging and
// troubleshooting tool that does not follow the change budget restrictions
if err := reconcileSuspendedPods(ctx, d.Client, d.ES, d.Expectations); err != nil {
return results.WithError(err)
}
// reconcile StatefulSets and nodes configuration
return results.WithResults(d.reconcileNodeSpecs(ctx, esReachable, esClient, d.ReconcileState, *resourcesState, keystoreResources))
}