pkg/controller/remotecluster/watches.go (113 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package remotecluster import ( "context" "fmt" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/remotecluster/keystore" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/label" corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" commonv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/common/v1" esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/watches" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/certificates/remoteca" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/certificates/transport" "github.com/elastic/cloud-on-k8s/v3/pkg/utils/maps" ) // AddWatches set watches on objects needed to manage the association between a local and a remote cluster. func addWatches(mgr manager.Manager, c controller.Controller, r *ReconcileRemoteClusters) error { // Watch for changes to RemoteCluster if err := c.Watch(source.Kind(mgr.GetCache(), &esv1.Elasticsearch{}, &handler.TypedEnqueueRequestForObject[*esv1.Elasticsearch]{})); err != nil { return err } // Emit changes to remote clusters to update API keys. if err := c.Watch( source.Kind( mgr.GetCache(), &esv1.Elasticsearch{}, handler.TypedEnqueueRequestsFromMapFunc[*esv1.Elasticsearch, reconcile.Request]( func(ctx context.Context, elasticsearch *esv1.Elasticsearch) []reconcile.Request { requests := make([]reconcile.Request, 0, len(elasticsearch.Spec.RemoteClusters)) for _, remoteCluster := range elasticsearch.Spec.RemoteClusters { requests = append(requests, reconcile.Request{NamespacedName: remoteCluster.ElasticsearchRef.WithDefaultNamespace(elasticsearch.Namespace).NamespacedName()}) } return requests }, ), ), ); err != nil { return err } // Watch Secrets that contain: // * Remote certificate authorities managed by this controller. // * API keys if err := c.Watch( source.Kind(mgr.GetCache(), &v1.Secret{}, handler.TypedEnqueueRequestsFromMapFunc[*v1.Secret, reconcile.Request](newRequestsFromMatchedLabels()), )); err != nil { return err } // Dynamically watches the certificate authorities involved in a cluster relationship if err := c.Watch(source.Kind(mgr.GetCache(), &v1.Secret{}, r.watches.Secrets)); err != nil { return err } return r.watches.Secrets.AddHandlers( &watches.OwnerWatch[*corev1.Secret]{ Scheme: mgr.GetScheme(), Mapper: mgr.GetRESTMapper(), OwnerType: &esv1.Elasticsearch{}, IsController: true, }, ) } // newRequestsFromMatchedLabels creates a watch handler function that creates reconcile requests based on the // labels set on a Secret which contains the remote CA. func newRequestsFromMatchedLabels() handler.TypedMapFunc[*v1.Secret, reconcile.Request] { return func(ctx context.Context, obj *v1.Secret) []reconcile.Request { labels := obj.GetLabels() if maps.ContainsKeys(labels, RemoteClusterNameLabelName, RemoteClusterNamespaceLabelName, commonv1.TypeLabelName) { // Remote cluster CA if labels[commonv1.TypeLabelName] != remoteca.TypeLabelValue { return nil } return []reconcile.Request{ {NamespacedName: types.NamespacedName{ Namespace: labels[RemoteClusterNamespaceLabelName], Name: labels[RemoteClusterNameLabelName]}, }, } } if maps.ContainsKeys(labels, label.ClusterNameLabelName, commonv1.TypeLabelName) { if labels[commonv1.TypeLabelName] != keystore.RemoteClusterAPIKeysType { return nil } // Remote cluster API keys Secret event. return []reconcile.Request{ {NamespacedName: types.NamespacedName{ Namespace: obj.Namespace, Name: labels[label.ClusterNameLabelName]}, }, } } return nil } } func watchName(local types.NamespacedName, remote types.NamespacedName) string { return fmt.Sprintf( "%s-%s-%s-%s", local.Namespace, local.Name, remote.Namespace, remote.Name, ) } // addCertificatesAuthorityWatches sets some watches on all secrets containing the certificate of a CA involved in a association. // The local CA is watched to update the trusted certificates in the remote clusters. // The remote CAs are watched to update the trusted certificates of the local cluster. func addCertificatesAuthorityWatches( reconcileClusterAssociation *ReconcileRemoteClusters, local, remote types.NamespacedName) error { // Watch the CA secret of Elasticsearch clusters which are involved in a association. err := reconcileClusterAssociation.watches.Secrets.AddHandler(watches.NamedWatch[*corev1.Secret]{ Name: watchName(local, remote), Watched: []types.NamespacedName{transport.PublicCertsSecretRef(remote)}, Watcher: types.NamespacedName{ Namespace: local.Namespace, Name: local.Name, }, }) if err != nil { return err } return nil }