func()

in pkg/controller/association/dynamic_watches.go [54:129]


func (r *Reconciler) reconcileWatches(ctx context.Context, associated types.NamespacedName, associations []commonv1.Association) error {
	managedElasticRef := filterManagedElasticRef(associations)
	unmanagedElasticRef := filterUnmanagedElasticRef(associations)

	// we have 2 modes (exclusive) for the referenced resource: managed or not managed by ECK and referencedResourceWatchName is shared between both.
	// either watch the referenced resource managed by ECK
	if err := ReconcileWatch(associated, managedElasticRef, r.watches.ReferencedResources, referencedResourceWatchName(associated), func(association commonv1.Association) types.NamespacedName {
		return association.AssociationRef().NamespacedName()
	}); err != nil {
		return err
	}
	// or watch the custom user secret that describes how to connect to the referenced resource not managed by ECK
	if err := ReconcileWatch(associated, unmanagedElasticRef, r.watches.Secrets, referencedResourceWatchName(associated), func(association commonv1.Association) types.NamespacedName {
		return association.AssociationRef().NamespacedName()
	}); err != nil {
		return err
	}

	// watch the CA secret of the referenced resource in the referenced resource namespace
	if err := ReconcileWatch(associated, managedElasticRef, r.watches.Secrets, referencedResourceCASecretWatchName(associated), func(association commonv1.Association) types.NamespacedName {
		ref := association.AssociationRef()
		return types.NamespacedName{
			Name:      certificates.PublicCertsSecretName(r.AssociationInfo.ReferencedResourceNamer, ref.NameOrSecretName()),
			Namespace: ref.Namespace,
		}
	}); err != nil {
		return err
	}

	// watch the custom services users may have setup to be able to react to updates on services that are not error related
	// (error related updates are covered by re-queueing on unsuccessful reconciliation)
	if err := ReconcileWatch(associated, filterWithServiceName(associations), r.watches.Services, serviceWatchName(associated), func(association commonv1.Association) types.NamespacedName {
		ref := association.AssociationRef()
		return types.NamespacedName{
			Name:      ref.ServiceName,
			Namespace: ref.Namespace,
		}
	}); err != nil {
		return err
	}

	// watch the Elasticsearch user secret in the Elasticsearch namespace, if needed
	if r.ElasticsearchUserCreation != nil {
		if err := ReconcileWatch(associated, managedElasticRef, r.watches.Secrets, esUserWatchName(associated), func(association commonv1.Association) types.NamespacedName {
			return UserKey(association, association.AssociationRef().Namespace, r.ElasticsearchUserCreation.UserSecretSuffix)
		}); err != nil {
			return err
		}
	}

	if r.AdditionalSecrets != nil {
		if err := reconcileGenericWatch(associated, managedElasticRef, r.watches.Secrets, additionalSecretWatchName(associated), func() ([]types.NamespacedName, error) {
			var toWatch []types.NamespacedName
			for _, association := range associations {
				secs, err := r.AdditionalSecrets(ctx, r.Client, association)
				if err != nil {
					return nil, err
				}
				// Watch the source secrets
				toWatch = append(toWatch, secs...)
				// Also watch the target secrets
				for _, sec := range secs {
					toWatch = append(toWatch, types.NamespacedName{
						Name:      sec.Name,
						Namespace: association.GetNamespace(),
					})
				}
			}
			return toWatch, nil
		}); err != nil {
			return err
		}
	}

	return nil
}