pkg/controller/association/reconciler.go (419 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package association import ( "context" "fmt" "hash" "hash/fnv" "reflect" "time" "github.com/pkg/errors" "go.elastic.co/apm/v2" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/elastic/cloud-on-k8s/v3/pkg/about" commonv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/common/v1" esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/annotation" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/events" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/name" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/operator" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/reconciler" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/tracing" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/watches" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/hints" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/user" "github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s" ulog "github.com/elastic/cloud-on-k8s/v3/pkg/utils/log" "github.com/elastic/cloud-on-k8s/v3/pkg/utils/maps" "github.com/elastic/cloud-on-k8s/v3/pkg/utils/rbac" ) var ( defaultRequeue = reconcile.Result{Requeue: true, RequeueAfter: 10 * time.Second} ) // AssociationInfo contains information specific to a particular associated resource (eg. Kibana, APMServer, etc.). type AssociationInfo struct { //nolint:revive // AssociationType identifies the type of the resource for association (eg. kibana for APM to Kibana association, // elasticsearch for Beat to Elasticsearch association) AssociationType commonv1.AssociationType // AssociatedObjTemplate builds an empty typed associated object (eg. &Kibana{} for a Kibana to Elasticsearch association). AssociatedObjTemplate func() commonv1.Associated // ReferencedObjTemplate builds an empty referenced object (e.g. Elasticsearch{} for a Kibana to Elasticsearch association). ReferencedObjTemplate func() client.Object // ReferencedResourceNamer is used to build the name of the Secret which contains the CA of the referenced resource // (the Elasticsearch Namer for a Kibana to Elasticsearch association). ReferencedResourceNamer name.Namer // ExternalServiceURL is used to build the external service url as it will be set in the resource configuration. ExternalServiceURL func(c k8s.Client, association commonv1.Association) (string, error) // AssociationName is the name of the association (eg. "kb-es"). AssociationName string // AssociatedShortName is the short name of the associated resource type (eg. "kb"). AssociatedShortName string // AdditionalSecrets are additional secrets to copy from an association's namespace to the associated resource namespace. // Currently this is only used for copying the CA from an Elasticsearch association to the same namespace as // an Agent referencing a Fleet Server. AdditionalSecrets func(context.Context, k8s.Client, commonv1.Association) ([]types.NamespacedName, error) // Labels are labels set on all resources created for association purpose. Note that some resources will be also // labelled with AssociationResourceNameLabelName and AssociationResourceNamespaceLabelName in addition to any // labels provided here. Labels func(associated types.NamespacedName) map[string]string // AssociationConfAnnotationNameBase is prefix of the name of the annotation used to define the config for the // associated resource. The annotation is used by the association controller to store the configuration and by // the controller which is managing the associated resource to build the appropriate configuration. The annotation // base is used to recognize annotations eligible for removal when association is removed. AssociationConfAnnotationNameBase string // ReferencedResourceVersion returns the currently running version of the referenced resource. // It may return an empty string if the version is unknown. A boolean is also returned, set to true if the referenced // resource is a serverless project running in https://cloud.elastic.co/ ReferencedResourceVersion func(c k8s.Client, association commonv1.Association) (string, bool, error) // AssociationResourceNameLabelName is a label used on resources needed for an association. It identifies the name // of the associated resource (eg. user secret allowing to connect Beat to Kibana will have this label pointing to the // Beat resource). AssociationResourceNameLabelName string // AssociationResourceNamespaceLabelName is a label used on resources needed for an association. It identifies the // namespace of the associated resource (eg. user secret allowing to connect Beat to Kibana will have this label // pointing to the Beat resource). AssociationResourceNamespaceLabelName string // ElasticsearchUserCreation specifies settings to create an Elasticsearch user as part of the association. // May be nil if no user creation is required. ElasticsearchUserCreation *ElasticsearchUserCreation } type ElasticsearchUserCreation struct { // ElasticsearchRef is a function which returns the maybe transitive Elasticsearch reference (eg. APMServer -> Kibana -> Elasticsearch). // In the case of a transitive reference this is used to create the Elasticsearch user. ElasticsearchRef func(c k8s.Client, association commonv1.Association) (bool, commonv1.ObjectSelector, error) // UserSecretSuffix is used as a suffix in the name of the secret holding user data in the associated namespace. UserSecretSuffix string // ESUserRole is the role to use for the Elasticsearch user created by the association. ESUserRole func(commonv1.Associated) (string, error) } // AssociationResourceLabels returns all labels required by a resource to allow identifying both its Associated resource // (ie. Kibana for Kibana-ES association) and its Association resource (ie. ES for APM-ES association). func (a AssociationInfo) AssociationResourceLabels( associated types.NamespacedName, association types.NamespacedName, ) client.MatchingLabels { return maps.Merge( map[string]string{ a.AssociationResourceNameLabelName: association.Name, a.AssociationResourceNamespaceLabelName: association.Namespace, }, a.Labels(associated), ) } // userLabelSelector returns labels selecting the ES user secret, including association labels and user type label. func (a AssociationInfo) userLabelSelector( associated types.NamespacedName, association types.NamespacedName, ) client.MatchingLabels { return maps.Merge( map[string]string{commonv1.TypeLabelName: user.AssociatedUserType}, a.AssociationResourceLabels(associated, association), ) } // Reconciler reconciles a generic association for a specific AssociationInfo. type Reconciler struct { AssociationInfo k8s.Client accessReviewer rbac.AccessReviewer recorder record.EventRecorder watches watches.DynamicWatches operator.Parameters // iteration is the number of times this controller has run its Reconcile method iteration uint64 } func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { nameField := fmt.Sprintf("%s_name", r.AssociatedShortName) ctx = common.NewReconciliationContext(ctx, &r.iteration, r.Tracer, r.AssociationName, nameField, request) defer common.LogReconciliationRun(ulog.FromContext(ctx))() defer tracing.EndContextTransaction(ctx) log := ulog.FromContext(ctx) associated := r.AssociatedObjTemplate() if err := r.Client.Get(ctx, request.NamespacedName, associated); err != nil { if apierrors.IsNotFound(err) { // object resource has been deleted, remove artifacts related to the association. r.onDelete(ctx, types.NamespacedName{ Namespace: request.Namespace, Name: request.Name, }) return reconcile.Result{}, nil } return reconcile.Result{}, tracing.CaptureError(ctx, err) } associatedKey := k8s.ExtractNamespacedName(associated) if common.IsUnmanaged(ctx, associated) { log.Info("Object is currently not managed by this controller. Skipping reconciliation") return reconcile.Result{}, nil } if !associated.GetDeletionTimestamp().IsZero() { // Object is being deleted, short-circuit reconciliation return reconcile.Result{}, nil } if err := RemoveObsoleteAssociationConfs(ctx, r.Client, associated, r.AssociationConfAnnotationNameBase); err != nil { return reconcile.Result{}, tracing.CaptureError(ctx, err) } // we are only interested in associations of the same target type here // (e.g. Kibana -> Enterprise Search, not Kibana -> Elasticsearch) associations := make([]commonv1.Association, 0) for _, association := range associated.GetAssociations() { if association.AssociationType() == r.AssociationType { associations = append(associations, association) } } // garbage collect leftover resources that are not required anymore if err := deleteOrphanedResources(ctx, r.Client, r.AssociationInfo, associatedKey, associations); err != nil { log.Error(err, "Error while trying to delete orphaned resources. Continuing.") } // reconcile watches for all associations of this type if err := r.reconcileWatches(ctx, associatedKey, associations); err != nil { return reconcile.Result{}, tracing.CaptureError(ctx, err) } results := reconciler.NewResult(ctx) newStatusMap := commonv1.AssociationStatusMap{} for _, association := range associations { newStatus, err := r.reconcileAssociation(ctx, association) if err != nil { results.WithError(err) } newStatusMap[association.AssociationRef().NamespacedName().String()] = newStatus } // we want to attempt a status update even in the presence of errors if err := r.updateStatus(ctx, associated, newStatusMap); err != nil && apierrors.IsConflict(err) { log.V(1).Info( "Conflict while updating status", "namespace", associatedKey.Namespace, "name", associatedKey.Name) return results.WithResult(reconcile.Result{Requeue: true}).Aggregate() } else if err != nil { return defaultRequeue, tracing.CaptureError(ctx, errors.Wrapf(err, "while updating status")) } return results. WithResult(RequeueRbacCheck(r.accessReviewer)). WithResult(resultFromStatuses(newStatusMap)). Aggregate() } func (r *Reconciler) reconcileAssociation(ctx context.Context, association commonv1.Association) (commonv1.AssociationStatus, error) { assocRef := association.AssociationRef() log := ulog.FromContext(ctx) // the referenced object can be an Elastic resource or a custom Secret referencedObj := r.ReferencedObjTemplate() if assocRef.IsExternal() { referencedObj = &corev1.Secret{} } // check if the referenced object exists exists, err := k8s.ObjectExists(r.Client, assocRef.NamespacedName(), referencedObj) if err != nil { return commonv1.AssociationFailed, err } if !exists { // the associated resource does not exist (yet), set status to Pending and remove the existing association conf return commonv1.AssociationPending, RemoveAssociationConf(ctx, r.Client, association) } if assocRef.IsExternal() { log.V(1).Info("Association with an unmanaged resource", "name", association.Associated().GetName(), "ref_name", assocRef.Name) // external reference, update association conf to associate the unmanaged resource expectedAssocConf, err := r.ExpectedConfigFromUnmanagedAssociation(association) if err != nil { r.recorder.Eventf(association.Associated(), corev1.EventTypeWarning, events.EventAssociationError, "Failed to reconcile external resource %q: %v", assocRef.NameOrSecretName(), err.Error()) return commonv1.AssociationFailed, err } return r.updateAssocConf(ctx, &expectedAssocConf, association) } caSecret, err := r.ReconcileCASecret( ctx, association, r.AssociationInfo.ReferencedResourceNamer, assocRef.NamespacedName(), ) if err != nil { return commonv1.AssociationPending, err // maybe not created yet } var secretsHash hash.Hash32 if r.AdditionalSecrets != nil { secretsHash = fnv.New32a() additionalSecrets, err := r.AdditionalSecrets(ctx, r.Client, association) if err != nil { return commonv1.AssociationPending, err // maybe not created yet } for _, sec := range additionalSecrets { if err := copySecret(ctx, r.Client, secretsHash, association.GetNamespace(), sec); err != nil { return commonv1.AssociationPending, err } } } url, err := r.AssociationInfo.ExternalServiceURL(r.Client, association) if err != nil { // the Service may not have been created by the resource controller yet if apierrors.IsNotFound(err) { log.Info("Associated resource Service is not available yet", "error", err, "name", association.Associated().GetName(), "ref_name", assocRef.Name) return commonv1.AssociationPending, nil } return commonv1.AssociationPending, err } // propagate the currently running version of the referenced resource (example: Elasticsearch version). // The Kibana controller (for example) can then delay a Kibana version upgrade if Elasticsearch is not upgraded yet. ver, isServerless, err := r.ReferencedResourceVersion(r.Client, association) if err != nil { return commonv1.AssociationPending, err } // construct the expected association configuration expectedAssocConf := &commonv1.AssociationConf{ CACertProvided: caSecret.CACertProvided, CASecretName: caSecret.Name, URL: url, Version: ver, Serverless: isServerless, } if secretsHash != nil { expectedAssocConf.AdditionalSecretsHash = fmt.Sprint(secretsHash.Sum32()) } if r.ElasticsearchUserCreation == nil { // no user creation required, update the association conf as such expectedAssocConf.AuthSecretName = commonv1.NoAuthRequiredValue return r.updateAssocConf(ctx, expectedAssocConf, association) } // since Elasticsearch can be a transitive reference we need to use the provided ElasticsearchRef function found, esAssocRef, err := r.ElasticsearchUserCreation.ElasticsearchRef(r.Client, association) if err != nil { return commonv1.AssociationFailed, err } // the Elasticsearch ref does not exist yet, set status to Pending if !found { return commonv1.AssociationPending, RemoveAssociationConf(ctx, r.Client, association) } if esAssocRef.IsExternal() { log.V(1).Info("Association with a transitive unmanaged Elasticsearch, skip user creation", "name", association.Associated().GetName(), "ref_name", assocRef.Name, "es_ref_name", esAssocRef.Name) // this a transitive unmanaged Elasticsearch, no user creation, update the association conf as such expectedAssocConf.AuthSecretName = esAssocRef.SecretName expectedAssocConf.AuthSecretKey = authPasswordUnmanagedSecretKey return r.updateAssocConf(ctx, expectedAssocConf, association) } // retrieve the Elasticsearch resource es, associationStatus, err := r.getElasticsearch(ctx, association, esAssocRef) if associationStatus != "" || err != nil { return associationStatus, err } // check if reference to Elasticsearch is allowed to be established if allowed, err := CheckAndUnbind(ctx, r.accessReviewer, association, &es, r, r.recorder); err != nil || !allowed { return commonv1.AssociationPending, err } serviceAccount, err := association.ElasticServiceAccount() if err != nil { return commonv1.AssociationPending, err } // Detect if we should use a service account. var esHints hints.OrchestrationsHints if len(serviceAccount) > 0 { // We must first ensure that the relevant orchestration hint is set on the Elasticsearch cluster. esHints, err = hints.NewFrom(es) if err != nil { return commonv1.AssociationPending, err } if !esHints.ServiceAccounts.IsSet() { log.Info("Waiting for Elasticsearch to report if service accounts are fully rolled out") return commonv1.AssociationPending, nil } } // If it is the case create the related Secrets and update the association configuration on the associated resource. assocLabels := r.AssociationResourceLabels(k8s.ExtractNamespacedName(association.Associated()), assocRef.NamespacedName()) if len(serviceAccount) > 0 && esHints.ServiceAccounts.IsTrue() { applicationSecretName := secretKey(association, r.ElasticsearchUserCreation.UserSecretSuffix) log.V(1).Info("Ensure service account exists", "sa", serviceAccount) err := ReconcileServiceAccounts( ctx, r.Client, es, assocLabels, applicationSecretName, UserKey(association, es.Namespace, r.ElasticsearchUserCreation.UserSecretSuffix), serviceAccount, association.GetName(), association.GetUID(), ) if err != nil { return commonv1.AssociationFailed, err } expectedAssocConf.AuthSecretName = applicationSecretName.Name expectedAssocConf.AuthSecretKey = "token" expectedAssocConf.IsServiceAccount = true // update the association configuration if necessary return r.updateAssocConf(ctx, expectedAssocConf, association) } userRole, err := r.ElasticsearchUserCreation.ESUserRole(association.Associated()) if err != nil { return commonv1.AssociationFailed, err } if err := reconcileEsUserSecret( ctx, r.Client, association, assocLabels, userRole, r.ElasticsearchUserCreation.UserSecretSuffix, es, ); err != nil { return commonv1.AssociationPending, err } authSecretRef := UserSecretKeySelector(association, r.ElasticsearchUserCreation.UserSecretSuffix) expectedAssocConf.AuthSecretName = authSecretRef.Name expectedAssocConf.AuthSecretKey = authSecretRef.Key // update the association configuration if necessary return r.updateAssocConf(ctx, expectedAssocConf, association) } // getElasticsearch attempts to retrieve the referenced Elasticsearch resource. If not found, it removes // any existing association configuration on associated, and returns AssociationPending. func (r *Reconciler) getElasticsearch( ctx context.Context, association commonv1.Association, elasticsearchRef commonv1.ObjectSelector, ) (esv1.Elasticsearch, commonv1.AssociationStatus, error) { span, ctx := apm.StartSpan(ctx, "get_elasticsearch", tracing.SpanTypeApp) defer span.End() var es esv1.Elasticsearch err := r.Get(ctx, elasticsearchRef.NamespacedName(), &es) if err != nil { k8s.MaybeEmitErrorEvent(r.recorder, err, association, events.EventAssociationError, "Failed to find referenced backend %s: %v", elasticsearchRef.NamespacedName(), err) if apierrors.IsNotFound(err) { // ES is not found, remove any existing backend configuration and retry in a bit. if err := RemoveAssociationConf(ctx, r.Client, association); err != nil && !apierrors.IsConflict(err) { ulog.FromContext(ctx).Error(err, "Failed to remove Elasticsearch association configuration") return esv1.Elasticsearch{}, commonv1.AssociationPending, err } return esv1.Elasticsearch{}, commonv1.AssociationPending, nil } return esv1.Elasticsearch{}, commonv1.AssociationFailed, err } return es, "", nil } // Unbind removes the association resources. func (r *Reconciler) Unbind(ctx context.Context, association commonv1.Association) error { // Ensure that user in Elasticsearch is deleted to prevent illegitimate access if err := k8s.DeleteSecretMatching( ctx, r.Client, r.userLabelSelector( k8s.ExtractNamespacedName(association), association.AssociationRef().NamespacedName(), )); err != nil { return err } // Also remove the association configuration return RemoveAssociationConf(ctx, r.Client, association) } // updateAssocConf updates associated with the expected association conf. func (r *Reconciler) updateAssocConf( ctx context.Context, expectedAssocConf *commonv1.AssociationConf, association commonv1.Association, ) (commonv1.AssociationStatus, error) { span, ctx := apm.StartSpan(ctx, "update_assoc_conf", tracing.SpanTypeApp) defer span.End() log := ulog.FromContext(ctx) assocConf, err := association.AssociationConf() if err != nil { return "", err } if !reflect.DeepEqual(expectedAssocConf, assocConf) { log.Info("Updating association configuration") if err := UpdateAssociationConf(ctx, r.Client, association, expectedAssocConf); err != nil { if apierrors.IsConflict(err) { return commonv1.AssociationPending, nil } log.Error(err, "Failed to update association configuration") return commonv1.AssociationPending, err } association.SetAssociationConf(expectedAssocConf) } return commonv1.AssociationEstablished, nil } // updateStatus updates the associated resource status. func (r *Reconciler) updateStatus(ctx context.Context, associated commonv1.Associated, newStatus commonv1.AssociationStatusMap) error { span, _ := apm.StartSpan(ctx, "update_association_status", tracing.SpanTypeApp) defer span.End() oldStatus := associated.AssociationStatusMap(r.AssociationType) // To correctly compare statuses without making the reconciler aware of singleton vs multiple associations status // differences we: set new status, get it from associated and only then compare with the oldStatus. Setting the // same status is harmless, setting a different status is fine as we have a copy of oldStatus above. if err := associated.SetAssociationStatusMap(r.AssociationType, newStatus); err != nil { return err } newStatus = associated.AssociationStatusMap(r.AssociationType) // shortcut if the two maps are nil or empty if len(oldStatus) == 0 && len(newStatus) == 0 { return nil } if !reflect.DeepEqual(oldStatus, newStatus) { if err := r.Status().Update(ctx, associated); err != nil { return err } annotations, err := annotation.ForAssociationStatusChange(oldStatus, newStatus) if err != nil { return err } r.recorder.AnnotatedEventf( associated, annotations, corev1.EventTypeNormal, events.EventAssociationStatusChange, "Association status changed from [%s] to [%s]", oldStatus, newStatus) } return nil } func resultFromStatuses(statusMap commonv1.AssociationStatusMap) reconcile.Result { for _, status := range statusMap { if status == commonv1.AssociationPending { return defaultRequeue // retry } } return reconcile.Result{} // we are done or there is not much we can do } func (r *Reconciler) onDelete(ctx context.Context, associated types.NamespacedName) { // remove watches r.removeWatches(associated) // delete user Secret in the Elasticsearch namespace if err := deleteOrphanedResources(ctx, r.Client, r.AssociationInfo, associated, nil); err != nil { ulog.FromContext(ctx).Error(err, "Error while trying to delete orphaned resources. Continuing.") } } // NewTestAssociationReconciler creates a new AssociationReconciler given an AssociationInfo for testing. func NewTestAssociationReconciler(assocInfo AssociationInfo, runtimeObjs ...client.Object) Reconciler { return Reconciler{ AssociationInfo: assocInfo, Client: k8s.NewFakeClient(runtimeObjs...), accessReviewer: rbac.NewPermissiveAccessReviewer(), watches: watches.NewDynamicWatches(), recorder: record.NewFakeRecorder(10), Parameters: operator.Parameters{ OperatorInfo: about.OperatorInfo{ BuildInfo: about.BuildInfo{ Version: "1.5.0", }, }, }, } }