pkg/controller/remotecluster/controller.go (347 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package remotecluster
import (
"context"
"fmt"
"time"
"go.elastic.co/apm/v2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
commonv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/common/v1"
esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/association"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common"
commonesclient "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/esclient"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/license"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/operator"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/reconciler"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/tracing"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/watches"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/certificates/remoteca"
esclient "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/client"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/label"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/services"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/remotecluster/keystore"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s"
ulog "github.com/elastic/cloud-on-k8s/v3/pkg/utils/log"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/rbac"
)
const (
name = "remotecluster-controller"
EventReasonClusterCaCertNotFound = "ClusterCaCertNotFound"
)
var (
defaultRequeue = reconcile.Result{Requeue: true, RequeueAfter: 10 * time.Second}
)
// Add creates a new ReconcileRemoteClusters Controller and adds it to the manager with default RBAC.
func Add(mgr manager.Manager, accessReviewer rbac.AccessReviewer, params operator.Parameters) error {
r := NewReconciler(mgr, accessReviewer, params)
c, err := common.NewController(mgr, name, r, params)
if err != nil {
return err
}
return addWatches(mgr, c, r)
}
// NewReconciler returns a new reconcile.Reconciler
func NewReconciler(mgr manager.Manager, accessReviewer rbac.AccessReviewer, params operator.Parameters) *ReconcileRemoteClusters {
c := mgr.GetClient()
return &ReconcileRemoteClusters{
Client: c,
accessReviewer: accessReviewer,
keystoreProvider: keystore.NewProvider(c),
watches: watches.NewDynamicWatches(),
recorder: mgr.GetEventRecorderFor(name),
licenseChecker: license.NewLicenseChecker(c, params.OperatorNamespace),
Parameters: params,
esClientProvider: commonesclient.NewClient,
}
}
var _ reconcile.Reconciler = &ReconcileRemoteClusters{}
// ReconcileRemoteClusters reconciles remote clusters Secrets and API Keys.
type ReconcileRemoteClusters struct {
k8s.Client
operator.Parameters
accessReviewer rbac.AccessReviewer
recorder record.EventRecorder
watches watches.DynamicWatches
licenseChecker license.Checker
esClientProvider commonesclient.Provider
keystoreProvider *keystore.Provider
// iteration is the number of times this controller has run its Reconcile method
iteration uint64
}
// Reconcile reads that state of the cluster for the expected remote clusters in this Kubernetes cluster.
// It copies the remote CA Secrets so they can be trusted by every peer Elasticsearch clusters.
func (r *ReconcileRemoteClusters) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
ctx = common.NewReconciliationContext(ctx, &r.iteration, r.Tracer, name, "es_name", request)
defer common.LogReconciliationRun(ulog.FromContext(ctx))()
defer tracing.EndContextTransaction(ctx)
// Fetch the local Elasticsearch spec
es := esv1.Elasticsearch{}
err := r.Get(ctx, request.NamespacedName, &es)
if err != nil {
if errors.IsNotFound(err) {
r.keystoreProvider.ForgetCluster(request.NamespacedName)
return deleteAllRemoteCa(ctx, r, request.NamespacedName)
}
return reconcile.Result{}, err
}
if common.IsUnmanaged(ctx, &es) {
ulog.FromContext(ctx).Info("Object is currently not managed by this controller. Skipping reconciliation", "namespace", es.Namespace, "es_name", es.Name)
return reconcile.Result{}, nil
}
return doReconcile(ctx, r, &es)
}
// deleteAllRemoteCa deletes all associated remote certificate authorities
func deleteAllRemoteCa(ctx context.Context, r *ReconcileRemoteClusters, es types.NamespacedName) (reconcile.Result, error) {
span, _ := apm.StartSpan(ctx, "delete_all_remote_ca", tracing.SpanTypeApp)
defer span.End()
associatedCAs, err := getAssociatedRemoteCAs(ctx, r.Client, es)
if err != nil {
return reconcile.Result{}, err
}
results := &reconciler.Results{}
for remoteCluster := range associatedCAs {
if err := deleteCertificateAuthorities(ctx, r, es, remoteCluster); err != nil {
results.WithError(err)
}
}
return results.Aggregate()
}
func doReconcile(
ctx context.Context,
r *ReconcileRemoteClusters,
remoteServer *esv1.Elasticsearch,
) (reconcile.Result, error) {
log := ulog.FromContext(ctx)
remoteServerKey := k8s.ExtractNamespacedName(remoteServer)
expectedRemoteClients, err := getExpectedRemoteClientsFor(ctx, r.Client, remoteServer)
if err != nil {
return reconcile.Result{}, err
}
enabled, err := r.licenseChecker.EnterpriseFeaturesEnabled(ctx)
if err != nil {
return defaultRequeue, err
}
if !enabled && len(expectedRemoteClients) > 0 {
log.V(1).Info(
"Remote cluster controller is an enterprise feature. Enterprise features are disabled",
"namespace", remoteServer.Namespace, "es_name", remoteServer.Name,
)
return reconcile.Result{}, nil
}
// Get all the clusters to which this reconciled cluster is connected to according to the existing remote CAs.
// associatedRemoteCAs is used to delete the CA certificates and cancel any trust relationships
// that may have existed in the past but should not exist anymore.
associatedRemoteCAs, err := getAssociatedRemoteCAs(ctx, r.Client, remoteServerKey)
if err != nil {
return reconcile.Result{}, err
}
var (
activeAPIKeys esclient.CrossClusterAPIKeyList
esClient esclient.Client
)
remoteServerSupportsClusterAPIKeys, err := remoteServer.SupportsRemoteClusterAPIKeys()
if err != nil {
return reconcile.Result{}, err
}
results := &reconciler.Results{}
if remoteServerSupportsClusterAPIKeys.IsTrue() {
// Check if the ES API is available. We need it to create, update and invalidate
// API keys in this cluster.
if !services.NewElasticsearchURLProvider(*remoteServer, r.Client).HasEndpoints() {
log.Info("Elasticsearch API is not available yet")
return results.WithResult(defaultRequeue).Aggregate()
}
// Create a new client
newEsClient, err := r.esClientProvider(ctx, r.Client, r.Dialer, *remoteServer)
if err != nil {
return reconcile.Result{}, err
}
// Check that the API is available
esClient = newEsClient
// Get all the API Keys, for that specific client, on the reconciled cluster.
crossClusterAPIKeys, err := esClient.GetCrossClusterAPIKeys(ctx, "eck-*")
if err != nil {
return reconcile.Result{}, err
}
activeAPIKeys = crossClusterAPIKeys
}
// apiKeyReconciledRemoteClients is used to track all the client clusters for which API keys have already been reconciled.
// This is used to garbage collect API keys for clusters which have been deleted and are not in expectedRemoteClusters.
apiKeyReconciledRemoteClients := sets.New[types.NamespacedName]()
// Main loop to:
// 1. Create or update expected remote CA.
// 2. Create or update API keys and keystores.
for remoteClientKey, remoteClusterRefs := range expectedRemoteClients {
// Get the remote/client Elasticsearch cluster associated with this local/reconciled cluster.
remoteClient := &esv1.Elasticsearch{}
if err := r.Client.Get(ctx, remoteClientKey, remoteClient); err != nil {
if errors.IsNotFound(err) {
// Remote client cluster does not exist, invalidate API keys for that client cluster.
apiKeyReconciledRemoteClients.Insert(remoteClientKey)
results.WithResults(reconcileAPIKeys(ctx, r.Client, activeAPIKeys, remoteServer, remoteClient, nil, esClient, r.keystoreProvider))
continue
}
return reconcile.Result{}, err
}
log := log.WithValues(
"remote_server_namespace", remoteServer.Namespace,
"remote_server", remoteServer.Name,
"remote_client_namespace", remoteClient.Namespace,
"remote_client_name", remoteClient.Name,
)
accessAllowed, err := isRemoteClusterAssociationAllowed(ctx, r.accessReviewer, remoteServer, remoteClient, r.recorder)
if err != nil {
return reconcile.Result{}, err
}
// if the remote CA exists but isn't allowed anymore, it will be deleted next
if !accessAllowed {
// Remove from the expected remote cluster to clean up local keystore.
delete(expectedRemoteClients, remoteClientKey)
// Invalidate API keys for that client cluster.
apiKeyReconciledRemoteClients.Insert(remoteClientKey)
results.WithResults(reconcileAPIKeys(ctx, r.Client, activeAPIKeys, remoteServer, remoteClient, nil, esClient, r.keystoreProvider))
continue
}
delete(associatedRemoteCAs, remoteClientKey)
results.WithResults(createOrUpdateCertificateAuthorities(ctx, r, remoteServer, remoteClient))
if results.HasError() {
return results.Aggregate()
}
// RCS2, first check that both the reconciled and the client clusters are compatible.
clientClusterSupportsClusterAPIKeys, err := remoteClient.SupportsRemoteClusterAPIKeys()
if err != nil {
results.WithError(err)
continue
}
if !clientClusterSupportsClusterAPIKeys.IsSet() {
log.Info("Client cluster version is not available in status yet, skipping API keys reconciliation")
continue
}
if !remoteServerSupportsClusterAPIKeys.IsSet() {
log.Info("Cluster version is not available in status yet, skipping API keys reconciliation")
continue
}
if clientClusterSupportsClusterAPIKeys.IsFalse() && remoteServerSupportsClusterAPIKeys.IsTrue() {
err := fmt.Errorf("client cluster %s/%s is running version %s which does not support remote cluster keys", remoteClient.Namespace, remoteClient.Name, remoteClient.Spec.Version)
log.Error(err, "cannot configure remote cluster settings")
continue
}
// Reconcile the API Keys.
apiKeyReconciledRemoteClients.Insert(remoteClientKey)
results.WithResults(reconcileAPIKeys(ctx, r.Client, activeAPIKeys, remoteServer, remoteClient, remoteClusterRefs, esClient, r.keystoreProvider))
}
if remoteServerSupportsClusterAPIKeys.IsTrue() {
// **************************************************************
// Delete orphaned API keys from clusters which have been deleted
// **************************************************************
for _, activeAPIKey := range activeAPIKeys.APIKeys {
clientCluster, err := activeAPIKey.GetElasticsearchName()
if err != nil {
results.WithError(err)
continue
}
if _, exists := apiKeyReconciledRemoteClients[clientCluster]; exists {
// API keys for that client cluster have already been reconciled, skip.
continue
}
// This API key in the local cluster state belongs to an unknown cluster which is not expected and has not been reconciled.
log.Info(fmt.Sprintf("Invalidating API key %s which belongs to unknown cluster %s", activeAPIKey.Name, clientCluster))
results.WithError(esClient.InvalidateCrossClusterAPIKey(ctx, activeAPIKey.Name))
}
// *********************************************
// Delete unexpected keys in the local keystore.
// *********************************************
expectedAliases := expectedAliases(remoteServer, expectedRemoteClients)
apiKeyStore, err := r.keystoreProvider.ForCluster(ctx, log, remoteServer)
if err != nil {
return results.WithError(err).Aggregate()
}
for alias := range apiKeyStore.GetAliases() {
if expectedAliases.Has(alias) {
// Expected alias
continue
}
// Unexpected
log.Info(fmt.Sprintf("Removing unexpected remote API key %s", alias))
apiKeyStore.Delete(alias)
}
results.WithResults(apiKeyStore.Save(ctx, r.Client, remoteServer))
}
// Delete existing but not expected remote CA
for toDelete := range associatedRemoteCAs {
log.V(1).Info("Deleting remote CA",
"local_namespace", remoteServer.Namespace,
"local_name", remoteServer.Name,
"remote_namespace", toDelete.Namespace,
"remote_name", toDelete.Name,
)
results.WithError(deleteCertificateAuthorities(ctx, r, remoteServerKey, toDelete))
}
return results.WithResult(association.RequeueRbacCheck(r.accessReviewer)).Aggregate()
}
func expectedAliases(
localCluster *esv1.Elasticsearch,
expectedRemoteCluster map[types.NamespacedName][]esv1.RemoteCluster,
) sets.Set[string] {
aliases := sets.New[string]()
for _, remoteCluster := range localCluster.Spec.RemoteClusters {
clientClusterNamespacedName := remoteCluster.ElasticsearchRef.WithDefaultNamespace(localCluster.Namespace).NamespacedName()
if _, ok := expectedRemoteCluster[clientClusterNamespacedName]; !ok {
// Not expected, might have been filtered by RBAC rules
continue
}
if remoteCluster.APIKey == nil {
// Not using remote cluster server.
continue
}
aliases.Insert(remoteCluster.Name)
}
return aliases
}
func caCertMissingError(cluster types.NamespacedName) string {
return fmt.Sprintf("Cannot find CA certificate cluster %s/%s", cluster.Namespace, cluster.Name)
}
// getExpectedRemoteClientsFor returns all the remote cluster keys for which a remote ca and an API Key should be created.
// The CA certificates must be copied from the remote cluster to the local one and vice versa.
// The API Key is created in the remote cluster and injected in the keystore of the local cluster.
func getExpectedRemoteClientsFor(
ctx context.Context,
c k8s.Client,
associatedEs *esv1.Elasticsearch,
) (map[types.NamespacedName][]esv1.RemoteCluster, error) {
span, _ := apm.StartSpan(ctx, "get_expected_remote_clusters", tracing.SpanTypeApp)
defer span.End()
expectedRemoteClusters := make(map[types.NamespacedName][]esv1.RemoteCluster)
// AddKey remote clusters declared in the Spec
for _, remoteCluster := range associatedEs.Spec.RemoteClusters {
if !remoteCluster.ElasticsearchRef.IsDefined() {
continue
}
esRef := remoteCluster.ElasticsearchRef.WithDefaultNamespace(associatedEs.Namespace)
expectedRemoteClusters[esRef.NamespacedName()] = nil
}
var list esv1.ElasticsearchList
if err := c.List(ctx, &list, &client.ListOptions{}); err != nil {
return nil, err
}
// Seek for Elasticsearch resources where this cluster is declared as a remote cluster
for _, es := range list.Items {
es := es
for _, remoteCluster := range es.Spec.RemoteClusters {
if !remoteCluster.ElasticsearchRef.IsDefined() {
continue
}
esRef := remoteCluster.ElasticsearchRef.WithDefaultNamespace(es.Namespace)
if esRef.Namespace == associatedEs.Namespace &&
esRef.Name == associatedEs.Name {
clientClusterName := k8s.ExtractNamespacedName(&es)
expectedRemoteClusters[clientClusterName] = append(expectedRemoteClusters[clientClusterName], remoteCluster)
}
}
}
return expectedRemoteClusters, nil
}
// getAssociatedRemoteCAs returns for a given Elasticsearch cluster all the Elasticsearch keys for which
// the remote certificate authorities have been copied, i.e. all the other Elasticsearch clusters for which this cluster
// has been involved in a remote cluster association.
// In order to get all of them we:
// 1. List all the remote CA copied locally.
// 2. List all the other Elasticsearch clusters for which the CA of the given cluster has been copied.
func getAssociatedRemoteCAs(
ctx context.Context,
c k8s.Client,
es types.NamespacedName,
) (map[types.NamespacedName]struct{}, error) {
span, _ := apm.StartSpan(ctx, "get_current_remote_ca", tracing.SpanTypeApp)
defer span.End()
currentRemoteClusters := make(map[types.NamespacedName]struct{})
// 1. Get clusters whose CA has been copied into the local namespace.
var remoteCAList corev1.SecretList
if err := c.List(ctx,
&remoteCAList,
client.InNamespace(es.Namespace),
remoteca.Labels(es.Name),
); err != nil {
return nil, err
}
for _, remoteCA := range remoteCAList.Items {
remoteNs := remoteCA.Labels[RemoteClusterNamespaceLabelName]
remoteEs := remoteCA.Labels[RemoteClusterNameLabelName]
currentRemoteClusters[types.NamespacedName{
Namespace: remoteNs,
Name: remoteEs,
}] = struct{}{}
}
// 2. Get clusters for which the CA of the local cluster has been copied.
if err := c.List(ctx,
&remoteCAList,
client.MatchingLabels(map[string]string{
commonv1.TypeLabelName: remoteca.TypeLabelValue,
RemoteClusterNamespaceLabelName: es.Namespace,
RemoteClusterNameLabelName: es.Name,
}),
); err != nil {
return nil, err
}
for _, remoteCA := range remoteCAList.Items {
remoteEs := remoteCA.Labels[label.ClusterNameLabelName]
currentRemoteClusters[types.NamespacedName{
Namespace: remoteCA.Namespace,
Name: remoteEs,
}] = struct{}{}
}
return currentRemoteClusters, nil
}