in pkg/controller/association/reconciler.go [228:416]
func (r *Reconciler) reconcileAssociation(ctx context.Context, association commonv1.Association) (commonv1.AssociationStatus, error) {
assocRef := association.AssociationRef()
log := ulog.FromContext(ctx)
// the referenced object can be an Elastic resource or a custom Secret
referencedObj := r.ReferencedObjTemplate()
if assocRef.IsExternal() {
referencedObj = &corev1.Secret{}
}
// check if the referenced object exists
exists, err := k8s.ObjectExists(r.Client, assocRef.NamespacedName(), referencedObj)
if err != nil {
return commonv1.AssociationFailed, err
}
if !exists {
// the associated resource does not exist (yet), set status to Pending and remove the existing association conf
return commonv1.AssociationPending, RemoveAssociationConf(ctx, r.Client, association)
}
if assocRef.IsExternal() {
log.V(1).Info("Association with an unmanaged resource", "name", association.Associated().GetName(), "ref_name", assocRef.Name)
// external reference, update association conf to associate the unmanaged resource
expectedAssocConf, err := r.ExpectedConfigFromUnmanagedAssociation(association)
if err != nil {
r.recorder.Eventf(association.Associated(), corev1.EventTypeWarning, events.EventAssociationError, "Failed to reconcile external resource %q: %v", assocRef.NameOrSecretName(), err.Error())
return commonv1.AssociationFailed, err
}
return r.updateAssocConf(ctx, &expectedAssocConf, association)
}
caSecret, err := r.ReconcileCASecret(
ctx,
association,
r.AssociationInfo.ReferencedResourceNamer,
assocRef.NamespacedName(),
)
if err != nil {
return commonv1.AssociationPending, err // maybe not created yet
}
var secretsHash hash.Hash32
if r.AdditionalSecrets != nil {
secretsHash = fnv.New32a()
additionalSecrets, err := r.AdditionalSecrets(ctx, r.Client, association)
if err != nil {
return commonv1.AssociationPending, err // maybe not created yet
}
for _, sec := range additionalSecrets {
if err := copySecret(ctx, r.Client, secretsHash, association.GetNamespace(), sec); err != nil {
return commonv1.AssociationPending, err
}
}
}
url, err := r.AssociationInfo.ExternalServiceURL(r.Client, association)
if err != nil {
// the Service may not have been created by the resource controller yet
if apierrors.IsNotFound(err) {
log.Info("Associated resource Service is not available yet", "error", err, "name", association.Associated().GetName(), "ref_name", assocRef.Name)
return commonv1.AssociationPending, nil
}
return commonv1.AssociationPending, err
}
// propagate the currently running version of the referenced resource (example: Elasticsearch version).
// The Kibana controller (for example) can then delay a Kibana version upgrade if Elasticsearch is not upgraded yet.
ver, isServerless, err := r.ReferencedResourceVersion(r.Client, association)
if err != nil {
return commonv1.AssociationPending, err
}
// construct the expected association configuration
expectedAssocConf := &commonv1.AssociationConf{
CACertProvided: caSecret.CACertProvided,
CASecretName: caSecret.Name,
URL: url,
Version: ver,
Serverless: isServerless,
}
if secretsHash != nil {
expectedAssocConf.AdditionalSecretsHash = fmt.Sprint(secretsHash.Sum32())
}
if r.ElasticsearchUserCreation == nil {
// no user creation required, update the association conf as such
expectedAssocConf.AuthSecretName = commonv1.NoAuthRequiredValue
return r.updateAssocConf(ctx, expectedAssocConf, association)
}
// since Elasticsearch can be a transitive reference we need to use the provided ElasticsearchRef function
found, esAssocRef, err := r.ElasticsearchUserCreation.ElasticsearchRef(r.Client, association)
if err != nil {
return commonv1.AssociationFailed, err
}
// the Elasticsearch ref does not exist yet, set status to Pending
if !found {
return commonv1.AssociationPending, RemoveAssociationConf(ctx, r.Client, association)
}
if esAssocRef.IsExternal() {
log.V(1).Info("Association with a transitive unmanaged Elasticsearch, skip user creation",
"name", association.Associated().GetName(), "ref_name", assocRef.Name, "es_ref_name", esAssocRef.Name)
// this a transitive unmanaged Elasticsearch, no user creation, update the association conf as such
expectedAssocConf.AuthSecretName = esAssocRef.SecretName
expectedAssocConf.AuthSecretKey = authPasswordUnmanagedSecretKey
return r.updateAssocConf(ctx, expectedAssocConf, association)
}
// retrieve the Elasticsearch resource
es, associationStatus, err := r.getElasticsearch(ctx, association, esAssocRef)
if associationStatus != "" || err != nil {
return associationStatus, err
}
// check if reference to Elasticsearch is allowed to be established
if allowed, err := CheckAndUnbind(ctx, r.accessReviewer, association, &es, r, r.recorder); err != nil || !allowed {
return commonv1.AssociationPending, err
}
serviceAccount, err := association.ElasticServiceAccount()
if err != nil {
return commonv1.AssociationPending, err
}
// Detect if we should use a service account.
var esHints hints.OrchestrationsHints
if len(serviceAccount) > 0 {
// We must first ensure that the relevant orchestration hint is set on the Elasticsearch cluster.
esHints, err = hints.NewFrom(es)
if err != nil {
return commonv1.AssociationPending, err
}
if !esHints.ServiceAccounts.IsSet() {
log.Info("Waiting for Elasticsearch to report if service accounts are fully rolled out")
return commonv1.AssociationPending, nil
}
}
// If it is the case create the related Secrets and update the association configuration on the associated resource.
assocLabels := r.AssociationResourceLabels(k8s.ExtractNamespacedName(association.Associated()), assocRef.NamespacedName())
if len(serviceAccount) > 0 && esHints.ServiceAccounts.IsTrue() {
applicationSecretName := secretKey(association, r.ElasticsearchUserCreation.UserSecretSuffix)
log.V(1).Info("Ensure service account exists", "sa", serviceAccount)
err := ReconcileServiceAccounts(
ctx,
r.Client,
es,
assocLabels,
applicationSecretName,
UserKey(association, es.Namespace, r.ElasticsearchUserCreation.UserSecretSuffix),
serviceAccount,
association.GetName(),
association.GetUID(),
)
if err != nil {
return commonv1.AssociationFailed, err
}
expectedAssocConf.AuthSecretName = applicationSecretName.Name
expectedAssocConf.AuthSecretKey = "token"
expectedAssocConf.IsServiceAccount = true
// update the association configuration if necessary
return r.updateAssocConf(ctx, expectedAssocConf, association)
}
userRole, err := r.ElasticsearchUserCreation.ESUserRole(association.Associated())
if err != nil {
return commonv1.AssociationFailed, err
}
if err := reconcileEsUserSecret(
ctx,
r.Client,
association,
assocLabels,
userRole,
r.ElasticsearchUserCreation.UserSecretSuffix,
es,
); err != nil {
return commonv1.AssociationPending, err
}
authSecretRef := UserSecretKeySelector(association, r.ElasticsearchUserCreation.UserSecretSuffix)
expectedAssocConf.AuthSecretName = authSecretRef.Name
expectedAssocConf.AuthSecretKey = authSecretRef.Key
// update the association configuration if necessary
return r.updateAssocConf(ctx, expectedAssocConf, association)
}