func()

in pkg/controller/association/reconciler.go [228:416]


func (r *Reconciler) reconcileAssociation(ctx context.Context, association commonv1.Association) (commonv1.AssociationStatus, error) {
	assocRef := association.AssociationRef()
	log := ulog.FromContext(ctx)

	// the referenced object can be an Elastic resource or a custom Secret
	referencedObj := r.ReferencedObjTemplate()
	if assocRef.IsExternal() {
		referencedObj = &corev1.Secret{}
	}

	// check if the referenced object exists
	exists, err := k8s.ObjectExists(r.Client, assocRef.NamespacedName(), referencedObj)
	if err != nil {
		return commonv1.AssociationFailed, err
	}
	if !exists {
		// the associated resource does not exist (yet), set status to Pending and remove the existing association conf
		return commonv1.AssociationPending, RemoveAssociationConf(ctx, r.Client, association)
	}

	if assocRef.IsExternal() {
		log.V(1).Info("Association with an unmanaged resource", "name", association.Associated().GetName(), "ref_name", assocRef.Name)
		// external reference, update association conf to associate the unmanaged resource
		expectedAssocConf, err := r.ExpectedConfigFromUnmanagedAssociation(association)
		if err != nil {
			r.recorder.Eventf(association.Associated(), corev1.EventTypeWarning, events.EventAssociationError, "Failed to reconcile external resource %q: %v", assocRef.NameOrSecretName(), err.Error())
			return commonv1.AssociationFailed, err
		}
		return r.updateAssocConf(ctx, &expectedAssocConf, association)
	}

	caSecret, err := r.ReconcileCASecret(
		ctx,
		association,
		r.AssociationInfo.ReferencedResourceNamer,
		assocRef.NamespacedName(),
	)
	if err != nil {
		return commonv1.AssociationPending, err // maybe not created yet
	}

	var secretsHash hash.Hash32
	if r.AdditionalSecrets != nil {
		secretsHash = fnv.New32a()
		additionalSecrets, err := r.AdditionalSecrets(ctx, r.Client, association)
		if err != nil {
			return commonv1.AssociationPending, err // maybe not created yet
		}
		for _, sec := range additionalSecrets {
			if err := copySecret(ctx, r.Client, secretsHash, association.GetNamespace(), sec); err != nil {
				return commonv1.AssociationPending, err
			}
		}
	}

	url, err := r.AssociationInfo.ExternalServiceURL(r.Client, association)
	if err != nil {
		// the Service may not have been created by the resource controller yet
		if apierrors.IsNotFound(err) {
			log.Info("Associated resource Service is not available yet", "error", err, "name", association.Associated().GetName(), "ref_name", assocRef.Name)
			return commonv1.AssociationPending, nil
		}
		return commonv1.AssociationPending, err
	}

	// propagate the currently running version of the referenced resource (example: Elasticsearch version).
	// The Kibana controller (for example) can then delay a Kibana version upgrade if Elasticsearch is not upgraded yet.
	ver, isServerless, err := r.ReferencedResourceVersion(r.Client, association)
	if err != nil {
		return commonv1.AssociationPending, err
	}

	// construct the expected association configuration
	expectedAssocConf := &commonv1.AssociationConf{
		CACertProvided: caSecret.CACertProvided,
		CASecretName:   caSecret.Name,
		URL:            url,
		Version:        ver,
		Serverless:     isServerless,
	}

	if secretsHash != nil {
		expectedAssocConf.AdditionalSecretsHash = fmt.Sprint(secretsHash.Sum32())
	}

	if r.ElasticsearchUserCreation == nil {
		// no user creation required, update the association conf as such
		expectedAssocConf.AuthSecretName = commonv1.NoAuthRequiredValue
		return r.updateAssocConf(ctx, expectedAssocConf, association)
	}

	// since Elasticsearch can be a transitive reference we need to use the provided ElasticsearchRef function
	found, esAssocRef, err := r.ElasticsearchUserCreation.ElasticsearchRef(r.Client, association)
	if err != nil {
		return commonv1.AssociationFailed, err
	}
	// the Elasticsearch ref does not exist yet, set status to Pending
	if !found {
		return commonv1.AssociationPending, RemoveAssociationConf(ctx, r.Client, association)
	}

	if esAssocRef.IsExternal() {
		log.V(1).Info("Association with a transitive unmanaged Elasticsearch, skip user creation",
			"name", association.Associated().GetName(), "ref_name", assocRef.Name, "es_ref_name", esAssocRef.Name)
		// this a transitive unmanaged Elasticsearch, no user creation, update the association conf as such
		expectedAssocConf.AuthSecretName = esAssocRef.SecretName
		expectedAssocConf.AuthSecretKey = authPasswordUnmanagedSecretKey
		return r.updateAssocConf(ctx, expectedAssocConf, association)
	}

	// retrieve the Elasticsearch resource
	es, associationStatus, err := r.getElasticsearch(ctx, association, esAssocRef)
	if associationStatus != "" || err != nil {
		return associationStatus, err
	}

	// check if reference to Elasticsearch is allowed to be established
	if allowed, err := CheckAndUnbind(ctx, r.accessReviewer, association, &es, r, r.recorder); err != nil || !allowed {
		return commonv1.AssociationPending, err
	}

	serviceAccount, err := association.ElasticServiceAccount()
	if err != nil {
		return commonv1.AssociationPending, err
	}
	// Detect if we should use a service account.
	var esHints hints.OrchestrationsHints
	if len(serviceAccount) > 0 {
		// We must first ensure that the relevant orchestration hint is set on the Elasticsearch cluster.
		esHints, err = hints.NewFrom(es)
		if err != nil {
			return commonv1.AssociationPending, err
		}
		if !esHints.ServiceAccounts.IsSet() {
			log.Info("Waiting for Elasticsearch to report if service accounts are fully rolled out")
			return commonv1.AssociationPending, nil
		}
	}

	// If it is the case create the related Secrets and update the association configuration on the associated resource.
	assocLabels := r.AssociationResourceLabels(k8s.ExtractNamespacedName(association.Associated()), assocRef.NamespacedName())
	if len(serviceAccount) > 0 && esHints.ServiceAccounts.IsTrue() {
		applicationSecretName := secretKey(association, r.ElasticsearchUserCreation.UserSecretSuffix)
		log.V(1).Info("Ensure service account exists", "sa", serviceAccount)
		err := ReconcileServiceAccounts(
			ctx,
			r.Client,
			es,
			assocLabels,
			applicationSecretName,
			UserKey(association, es.Namespace, r.ElasticsearchUserCreation.UserSecretSuffix),
			serviceAccount,
			association.GetName(),
			association.GetUID(),
		)
		if err != nil {
			return commonv1.AssociationFailed, err
		}
		expectedAssocConf.AuthSecretName = applicationSecretName.Name
		expectedAssocConf.AuthSecretKey = "token"
		expectedAssocConf.IsServiceAccount = true
		// update the association configuration if necessary
		return r.updateAssocConf(ctx, expectedAssocConf, association)
	}

	userRole, err := r.ElasticsearchUserCreation.ESUserRole(association.Associated())
	if err != nil {
		return commonv1.AssociationFailed, err
	}

	if err := reconcileEsUserSecret(
		ctx,
		r.Client,
		association,
		assocLabels,
		userRole,
		r.ElasticsearchUserCreation.UserSecretSuffix,
		es,
	); err != nil {
		return commonv1.AssociationPending, err
	}

	authSecretRef := UserSecretKeySelector(association, r.ElasticsearchUserCreation.UserSecretSuffix)
	expectedAssocConf.AuthSecretName = authSecretRef.Name
	expectedAssocConf.AuthSecretKey = authSecretRef.Key

	// update the association configuration if necessary
	return r.updateAssocConf(ctx, expectedAssocConf, association)
}