pkg/controller/elasticsearch/driver/driver.go (453 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package driver import ( "context" "crypto/x509" "fmt" "strings" "time" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "k8s.io/utils/ptr" controller "sigs.k8s.io/controller-runtime/pkg/reconcile" commonv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/common/v1" esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/association" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common" commondriver "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/driver" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/events" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/expectations" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/keystore" commonlicense "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/license" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/operator" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/reconciler" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/version" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/watches" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/bootstrap" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/certificates" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/cleanup" esclient "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/client" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/configmap" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/filesettings" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/hints" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/initcontainer" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/label" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/license" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/observer" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/reconcile" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/remotecluster" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/securitycontext" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/services" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/settings" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/stackmon" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/user" "github.com/elastic/cloud-on-k8s/v3/pkg/dev" "github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s" ulog "github.com/elastic/cloud-on-k8s/v3/pkg/utils/log" "github.com/elastic/cloud-on-k8s/v3/pkg/utils/optional" "github.com/elastic/cloud-on-k8s/v3/pkg/utils/set" ) var ( defaultRequeue = reconciler.ReconciliationState{Result: controller.Result{Requeue: true, RequeueAfter: 10 * time.Second}} ) // Driver orchestrates the reconciliation of an Elasticsearch resource. // Its lifecycle is bound to a single reconciliation attempt. type Driver interface { Reconcile(context.Context) *reconciler.Results } // NewDefaultDriver returns the default driver implementation. func NewDefaultDriver(parameters DefaultDriverParameters) Driver { return &defaultDriver{DefaultDriverParameters: parameters} } // DefaultDriverParameters contain parameters for this driver. // Most of them are persisted across driver creations. type DefaultDriverParameters struct { // OperatorParameters contain global parameters about the operator. OperatorParameters operator.Parameters // ES is the Elasticsearch resource to reconcile ES esv1.Elasticsearch // SupportedVersions verifies whether we can support upgrading from the current pods. SupportedVersions version.MinMaxVersion // Version is the version of Elasticsearch we want to reconcile towards. Version version.Version // Client is used to access the Kubernetes API. Client k8s.Client Recorder record.EventRecorder // LicenseChecker is used for some features to check if an appropriate license is setup LicenseChecker commonlicense.Checker // State holds the accumulated state during the reconcile loop ReconcileState *reconcile.State // Observers that observe es clusters state. Observers *observer.Manager // DynamicWatches are handles to currently registered dynamic watches. DynamicWatches watches.DynamicWatches // Expectations control some expectations set on resources in the cache, in order to // avoid doing certain operations if the cache hasn't seen an up-to-date resource yet. Expectations *expectations.Expectations } // defaultDriver is the default Driver implementation type defaultDriver struct { DefaultDriverParameters } func (d *defaultDriver) K8sClient() k8s.Client { return d.Client } func (d *defaultDriver) DynamicWatches() watches.DynamicWatches { return d.DefaultDriverParameters.DynamicWatches } func (d *defaultDriver) Recorder() record.EventRecorder { return d.DefaultDriverParameters.Recorder } var _ commondriver.Interface = &defaultDriver{} // Reconcile fulfills the Driver interface and reconciles the cluster resources. func (d *defaultDriver) Reconcile(ctx context.Context) *reconciler.Results { results := reconciler.NewResult(ctx) log := ulog.FromContext(ctx) // garbage collect secrets attached to this cluster that we don't need anymore if err := cleanup.DeleteOrphanedSecrets(ctx, d.Client, d.ES); err != nil { return results.WithError(err) } if err := configmap.ReconcileScriptsConfigMap(ctx, d.Client, d.ES); err != nil { return results.WithError(err) } _, err := common.ReconcileService(ctx, d.Client, services.NewTransportService(d.ES), &d.ES) if err != nil { return results.WithError(err) } externalService, err := common.ReconcileService(ctx, d.Client, services.NewExternalService(d.ES), &d.ES) if err != nil { if k8serrors.IsAlreadyExists(err) { return results.WithReconciliationState(defaultRequeue.WithReason(fmt.Sprintf("Pending %s service recreation", services.ExternalServiceName(d.ES.Name)))) } return results.WithError(err) } var internalService *corev1.Service internalService, err = common.ReconcileService(ctx, d.Client, services.NewInternalService(d.ES), &d.ES) if err != nil { return results.WithError(err) } // Remote Cluster Server (RCS2) Kubernetes Service reconciliation. if d.ES.Spec.RemoteClusterServer.Enabled { // Remote Cluster Server is enabled, ensure that the related Kubernetes Service does exist. if _, err := common.ReconcileService(ctx, d.Client, services.NewRemoteClusterService(d.ES), &d.ES); err != nil { results.WithError(err) } } else { // Ensure that remote cluster Service does not exist. remoteClusterService := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Namespace: d.ES.Namespace, Name: services.RemoteClusterServiceName(d.ES.Name), }, } results.WithError(k8s.DeleteResourceIfExists(ctx, d.Client, remoteClusterService)) } resourcesState, err := reconcile.NewResourcesStateFromAPI(d.Client, d.ES) if err != nil { return results.WithError(err) } warnUnsupportedDistro(resourcesState.AllPods, d.ReconcileState.Recorder) controllerUser, err := user.ReconcileUsersAndRoles(ctx, d.Client, d.ES, d.DynamicWatches(), d.Recorder(), d.OperatorParameters.PasswordHasher) if err != nil { return results.WithError(err) } trustedHTTPCertificates, res := certificates.ReconcileHTTP( ctx, d, d.ES, []corev1.Service{*externalService, *internalService}, d.OperatorParameters.GlobalCA, d.OperatorParameters.CACertRotation, d.OperatorParameters.CertRotation, ) results.WithResults(res) if res != nil && res.HasError() { return results } // start the ES observer minVersion, err := version.MinInPods(resourcesState.CurrentPods, label.VersionLabelName) if err != nil { return results.WithError(err) } if minVersion == nil { minVersion = &d.Version } urlProvider := services.NewElasticsearchURLProvider(d.ES, d.Client) hasEndpoints := urlProvider.HasEndpoints() observedState := d.Observers.ObservedStateResolver( ctx, d.ES, d.elasticsearchClientProvider( ctx, urlProvider, controllerUser, *minVersion, trustedHTTPCertificates, ), hasEndpoints, ) // Always update the Elasticsearch state bits with the latest observed state. d.ReconcileState. UpdateClusterHealth(observedState()). // Elasticsearch cluster health UpdateAvailableNodes(*resourcesState). // Available nodes UpdateMinRunningVersion(ctx, *resourcesState) // Min running version res = certificates.ReconcileTransport( ctx, d, d.ES, d.OperatorParameters.GlobalCA, d.OperatorParameters.CACertRotation, d.OperatorParameters.CertRotation, ) results.WithResults(res) if res != nil && res.HasError() { return results } // Patch the Pods to add the expected node labels as annotations. Record the error, if any, but do not stop the // reconciliation loop as we don't want to prevent other updates from being applied to the cluster. results.WithResults(annotatePodsWithNodeLabels(ctx, d.Client, d.ES)) if err := d.verifySupportsExistingPods(resourcesState.CurrentPods); err != nil { if !d.ES.IsConfiguredToAllowDowngrades() { return results.WithError(err) } log.Info("Allowing downgrade on user request", "warning", err.Error()) } // TODO: support user-supplied certificate (non-ca) esClient := d.newElasticsearchClient( ctx, urlProvider, controllerUser, *minVersion, trustedHTTPCertificates, ) defer esClient.Close() // use unknown health as a proxy for a cluster not responding to requests hasKnownHealthState := observedState() != esv1.ElasticsearchUnknownHealth esReachable := hasEndpoints && hasKnownHealthState // report condition in Pod status if esReachable { d.ReconcileState.ReportCondition(esv1.ElasticsearchIsReachable, corev1.ConditionTrue, esReachableConditionMessage(internalService, hasEndpoints, hasKnownHealthState)) } else { d.ReconcileState.ReportCondition(esv1.ElasticsearchIsReachable, corev1.ConditionFalse, esReachableConditionMessage(internalService, hasEndpoints, hasKnownHealthState)) } var currentLicense esclient.License if esReachable { currentLicense, err = license.CheckElasticsearchLicense(ctx, esClient) var e *license.GetLicenseError if errors.As(err, &e) { if !e.SupportedDistribution { msg := "Unsupported Elasticsearch distribution" // unsupported distribution, let's update the phase to "invalid" and stop the reconciliation d.ReconcileState. UpdateWithPhase(esv1.ElasticsearchResourceInvalid). AddEvent(corev1.EventTypeWarning, events.EventReasonUnexpected, fmt.Sprintf("%s: %s", msg, err.Error())) return results.WithError(errors.Wrap(err, strings.ToLower(msg[0:1])+msg[1:])) } // update esReachable to bypass steps that requires ES up in order to not block reconciliation for long periods esReachable = e.EsReachable } if err != nil { msg := "Could not verify license, re-queuing" log.Info(msg, "err", err, "namespace", d.ES.Namespace, "es_name", d.ES.Name) d.ReconcileState.AddEvent(corev1.EventTypeWarning, events.EventReasonUnexpected, fmt.Sprintf("%s: %s", msg, err.Error())) results.WithReconciliationState(defaultRequeue.WithReason(msg)) } } // Update the service account orchestration hint. This is done early in the reconciliation loop to unblock association // controllers that may be waiting for the orchestration hint. results.WithError(d.maybeSetServiceAccountsOrchestrationHint(ctx, esReachable, esClient, resourcesState)) // reconcile the Elasticsearch license (even if we assume the cluster might not respond to requests to cover the case of // expired licenses where all health API responses are 403) if hasEndpoints { err = license.Reconcile(ctx, d.Client, d.ES, esClient, currentLicense) if err != nil { msg := "Could not reconcile cluster license, re-queuing" // only log an event if Elasticsearch is in a state where success of this API call can be expected. The API call itself // will be logged by the client if hasKnownHealthState { log.Info(msg, "err", err, "namespace", d.ES.Namespace, "es_name", d.ES.Name) d.ReconcileState.AddEvent(corev1.EventTypeWarning, events.EventReasonUnexpected, fmt.Sprintf("%s: %s", msg, err.Error())) } results.WithReconciliationState(defaultRequeue.WithReason(msg)) } } // reconcile remote clusters if esReachable { requeue, err := remotecluster.UpdateSettings(ctx, d.Client, esClient, d.Recorder(), d.LicenseChecker, d.ES) msg := "Could not update remote clusters in Elasticsearch settings, re-queuing" if err != nil { log.Info(msg, "err", err, "namespace", d.ES.Namespace, "es_name", d.ES.Name) d.ReconcileState.AddEvent(corev1.EventTypeWarning, events.EventReasonUnexpected, msg) results.WithError(err) } if requeue { results.WithReconciliationState(defaultRequeue.WithReason("Updating remote cluster settings, re-queuing")) } } // Compute seed hosts based on current masters with a podIP if err := settings.UpdateSeedHostsConfigMap(ctx, d.Client, d.ES, resourcesState.AllPods); err != nil { return results.WithError(err) } // reconcile an empty File based settings Secret if it doesn't exist if d.Version.GTE(filesettings.FileBasedSettingsMinPreVersion) { err = filesettings.ReconcileEmptyFileSettingsSecret(ctx, d.Client, d.ES, true) if err != nil { return results.WithError(err) } } keystoreParams := initcontainer.KeystoreParams keystoreSecurityContext := securitycontext.For(d.Version, true) keystoreParams.SecurityContext = &keystoreSecurityContext // Set up a keystore with secure settings in an init container, if specified by the user. // We are also using the keystore internally for the remote cluster API keys. remoteClusterAPIKeys, err := apiKeyStoreSecretSource(ctx, &d.ES, d.Client) if err != nil { return results.WithError(err) } keystoreResources, err := keystore.ReconcileResources( ctx, d, &d.ES, esv1.ESNamer, label.NewLabels(k8s.ExtractNamespacedName(&d.ES)), keystoreParams, remoteClusterAPIKeys..., ) if err != nil { return results.WithError(err) } // set an annotation with the ClusterUUID, if bootstrapped requeue, err := bootstrap.ReconcileClusterUUID(ctx, d.Client, &d.ES, esClient, esReachable) if err != nil { return results.WithError(err) } if requeue { results = results.WithReconciliationState(defaultRequeue.WithReason("Elasticsearch cluster UUID is not reconciled")) } // reconcile beats config secrets if Stack Monitoring is defined err = stackmon.ReconcileConfigSecrets(ctx, d.Client, d.ES) if err != nil { return results.WithError(err) } // requeue if associations are defined but not yet configured, otherwise we may be in a situation where we deploy // Elasticsearch Pods once, then change their spec a few seconds later once the association is configured areAssocsConfigured, err := association.AreConfiguredIfSet(ctx, d.ES.GetAssociations(), d.Recorder()) if err != nil { return results.WithError(err) } if !areAssocsConfigured { results.WithReconciliationState(defaultRequeue.WithReason("Some associations are not reconciled")) } // we want to reconcile suspended Pods before we start reconciling node specs as this is considered a debugging and // troubleshooting tool that does not follow the change budget restrictions if err := reconcileSuspendedPods(ctx, d.Client, d.ES, d.Expectations); err != nil { return results.WithError(err) } // reconcile StatefulSets and nodes configuration return results.WithResults(d.reconcileNodeSpecs(ctx, esReachable, esClient, d.ReconcileState, *resourcesState, keystoreResources)) } // apiKeyStoreSecretSource returns the Secret that holds the remote API keys, and which should be used as a secure settings source. func apiKeyStoreSecretSource(ctx context.Context, es *esv1.Elasticsearch, c k8s.Client) ([]commonv1.NamespacedSecretSource, error) { // Check if Secret exists secretName := types.NamespacedName{ Name: esv1.RemoteAPIKeysSecretName(es.Name), Namespace: es.Namespace, } if err := c.Get(ctx, secretName, &corev1.Secret{}); err != nil { if k8serrors.IsNotFound(err) { return nil, nil } return nil, err } return []commonv1.NamespacedSecretSource{ { Namespace: es.Namespace, SecretName: secretName.Name, }, }, nil } // newElasticsearchClient creates a new Elasticsearch HTTP client for this cluster using the provided user func (d *defaultDriver) newElasticsearchClient( ctx context.Context, urlProvider esclient.URLProvider, user esclient.BasicAuth, v version.Version, caCerts []*x509.Certificate, ) esclient.Client { return esclient.NewElasticsearchClient( d.OperatorParameters.Dialer, k8s.ExtractNamespacedName(&d.ES), urlProvider, user, v, caCerts, esclient.Timeout(ctx, d.ES), dev.Enabled, ) } func (d *defaultDriver) elasticsearchClientProvider( ctx context.Context, urlProvider esclient.URLProvider, user esclient.BasicAuth, v version.Version, caCerts []*x509.Certificate, ) func(existingEsClient esclient.Client) esclient.Client { return func(existingEsClient esclient.Client) esclient.Client { if existingEsClient != nil && existingEsClient.HasProperties(v, user, urlProvider, caCerts) { return existingEsClient } return d.newElasticsearchClient(ctx, urlProvider, user, v, caCerts) } } // maybeSetServiceAccountsOrchestrationHint attempts to update an orchestration hint to let the association controllers // know whether all the nodes in the cluster are ready to authenticate service accounts. func (d *defaultDriver) maybeSetServiceAccountsOrchestrationHint( ctx context.Context, esReachable bool, securityClient esclient.SecurityClient, resourcesState *reconcile.ResourcesState, ) error { if d.ReconcileState.OrchestrationHints().ServiceAccounts.IsTrue() { // Orchestration hint is already set to true, there is no point going back to false. return nil } // Case 1: New cluster, we can immediately set the orchestration hint. if !bootstrap.AnnotatedForBootstrap(d.ES) { allNodesRunningServiceAccounts, err := esv1.AreServiceAccountsSupported(d.ES.Spec.Version) if err != nil { return err } d.ReconcileState.UpdateOrchestrationHints( d.ReconcileState.OrchestrationHints().Merge(hints.OrchestrationsHints{ServiceAccounts: optional.NewBool(allNodesRunningServiceAccounts)}), ) return nil } // Case 2: This is an existing cluster, but actual cluster version does not support service accounts. if d.ES.Status.Version == "" { return nil } supportServiceAccounts, err := esv1.AreServiceAccountsSupported(d.ES.Status.Version) if err != nil { return err } if !supportServiceAccounts { d.ReconcileState.UpdateOrchestrationHints( d.ReconcileState.OrchestrationHints().Merge(hints.OrchestrationsHints{ServiceAccounts: optional.NewBool(false)}), ) return nil } // Case 3: cluster is already running with a version that does support service account and tokens have already been created. // We don't however know if all nodes have been migrated and are running with the service_tokens file mounted from the configuration Secret. // Let's try to detect that situation by comparing the existing nodes and the ones returned by the /_security/service API. // Note that starting with release 2.3 the association controller does not create the service account token until Elasticsearch is annotated // as compatible with service accounts. This is mostly to unblock situation described in https://github.com/elastic/cloud-on-k8s/issues/5684 if !esReachable { // This requires the Elasticsearch API to be available return nil } allPods := names(resourcesState.AllPods) log := ulog.FromContext(ctx) // Detect if some service tokens are expected saTokens, err := user.GetServiceAccountTokens(d.Client, d.ES) if err != nil { log.Info("Could not detect if service accounts are expected", "err", err, "namespace", d.ES.Namespace, "es_name", d.ES.Name) return err } allNodesRunningServiceAccounts, err := allNodesRunningServiceAccounts(ctx, saTokens, set.Make(allPods...), securityClient) if err != nil { log.Info("Could not detect if all nodes are ready for using service accounts", "err", err, "namespace", d.ES.Namespace, "es_name", d.ES.Name) return err } if allNodesRunningServiceAccounts != nil { d.ReconcileState.UpdateOrchestrationHints( d.ReconcileState.OrchestrationHints().Merge(hints.OrchestrationsHints{ServiceAccounts: optional.NewBool(*allNodesRunningServiceAccounts)}), ) } return nil } // allNodesRunningServiceAccounts attempts to detect if all the nodes in the clusters have loaded the service_tokens file. // It returns nil if no decision can be made, for example when there is no tokens are expected to be found. func allNodesRunningServiceAccounts( ctx context.Context, saTokens user.ServiceAccountTokens, allPods set.StringSet, securityClient esclient.SecurityClient, ) (*bool, error) { if len(allPods) == 0 { return nil, nil } if len(saTokens) == 0 { // No tokens are expected: we cannot call the Elasticsearch API to detect which nodes are // running with the conf/service_tokens file. return nil, nil } // Get the namespaced service name to call the /_security/service/<namespace>/<service>/credential API namespacedServices := saTokens.NamespacedServices() // Get the nodes which have loaded tokens from the conf/service_tokens file. for namespacedService := range namespacedServices { credentials, err := securityClient.GetServiceAccountCredentials(ctx, namespacedService) if err != nil { return nil, err } diff := allPods.Diff(credentials.Nodes()) if len(diff) == 0 { return ptr.To[bool](true), nil } } // Some nodes are running but did not show up in the security API. return ptr.To[bool](false), nil } // warnUnsupportedDistro sends an event of type warning if the Elasticsearch Docker image is not a supported // distribution by looking at if the prepare fs init container terminated with the UnsupportedDistro exit code. func warnUnsupportedDistro(pods []corev1.Pod, recorder *events.Recorder) { for _, p := range pods { for _, s := range p.Status.InitContainerStatuses { state := s.LastTerminationState.Terminated if s.Name == initcontainer.PrepareFilesystemContainerName && state != nil && state.ExitCode == initcontainer.UnsupportedDistroExitCode { recorder.AddEvent(corev1.EventTypeWarning, events.EventReasonUnexpected, "Unsupported distribution") } } } } func esReachableConditionMessage(internalService *corev1.Service, isServiceReady bool, isRespondingToRequests bool) string { switch { case !isServiceReady: return fmt.Sprintf("Service %s/%s has no endpoint", internalService.Namespace, internalService.Name) case !isRespondingToRequests: return fmt.Sprintf("Service %s/%s has endpoints but Elasticsearch is unavailable", internalService.Namespace, internalService.Name) default: return fmt.Sprintf("Service %s/%s has endpoints", internalService.Namespace, internalService.Name) } }