pkg/controller/remotecluster/keystore/keystore.go (244 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package keystore import ( "context" "encoding/json" "fmt" "regexp" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" commonv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/common/v1" esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/labels" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/reconciler" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/label" "github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s" ulog "github.com/elastic/cloud-on-k8s/v3/pkg/utils/log" ) const ( aliasesAnnotationName = "elasticsearch.k8s.elastic.co/remote-cluster-api-keys" RemoteClusterAPIKeysType = "remote-cluster-api-keys" ) var ( credentialsSecretSettingsRegEx = regexp.MustCompile(`^cluster\.remote\.([\w-]+)\.credentials$`) ) type APIKeyStore struct { log logr.Logger // aliases maps cluster aliased with the expected key ID aliases map[string]AliasValue // encodedKeys maps the remote cluster alias, as define in the client cluster, to the encoded cross-cluster API key. encodedKeys map[string]string // resourceVersion is the ResourceVersion as observed when the Secret has been loaded. resourceVersion string // uid is the UID of the Secret as observed when the Secret has been loaded. uid types.UID // pendingChanges are the pending changes, they are used to record changes until they are observed in the underlying Secret. pendingChanges *pendingChanges } type AliasValue struct { // Namespace of the remote cluster. Namespace string `json:"namespace"` // Name of the remote cluster. Name string `json:"name"` // ID is the key ID. ID string `json:"id"` } func (aks *APIKeyStore) GetAliases() map[string]AliasValue { if aks == nil { return nil } return aks.aliases } func (aks *APIKeyStore) KeyIDFor(alias string) string { if aks == nil { return "" } return aks.aliases[alias].ID } func loadAPIKeyStore(ctx context.Context, log logr.Logger, c k8s.Client, owner *esv1.Elasticsearch, pendingChanges *pendingChanges) (*APIKeyStore, error) { secretName := types.NamespacedName{ Name: esv1.RemoteAPIKeysSecretName(owner.Name), Namespace: owner.Namespace, } // Attempt to read the Secret keyStoreSecret := &corev1.Secret{} if err := c.Get(ctx, secretName, keyStoreSecret); err != nil { if errors.IsNotFound(err) { ulog.FromContext(ctx).V(1).Info("No APIKeyStore Secret found") // Return an empty store emptyKeystore := &APIKeyStore{log: log, pendingChanges: pendingChanges} return emptyKeystore.withPendingChanges(), nil } } // Read the key aliased aliases := make(map[string]AliasValue) if aliasesAnnotation, ok := keyStoreSecret.Annotations[aliasesAnnotationName]; ok { if err := json.Unmarshal([]byte(aliasesAnnotation), &aliases); err != nil { return nil, err } } // Read the current encoded cross-cluster API keys. encodedKeys := make(map[string]string) for settingName, encodedAPIKey := range keyStoreSecret.Data { strings := credentialsSecretSettingsRegEx.FindStringSubmatch(settingName) if len(strings) != 2 { ulog.FromContext(ctx).V(1).Info( fmt.Sprintf("Unknown remote cluster credential setting: %s", settingName), ) continue } encodedKeys[strings[1]] = string(encodedAPIKey) } apiKeyStore := &APIKeyStore{ log: log, aliases: aliases, encodedKeys: encodedKeys, resourceVersion: keyStoreSecret.ResourceVersion, uid: keyStoreSecret.UID, pendingChanges: pendingChanges, } return apiKeyStore.withPendingChanges(), nil } // withPendingChanges checks if the pending changes are reflected in the Secret. If it is the case these changes are removed from the expected changes. // If not there are "virtually" added to the current keystore. func (aks *APIKeyStore) withPendingChanges() *APIKeyStore { pendingChanges := aks.pendingChanges.Get() var pendingAdds, pendingDeletions int for _, pendingChange := range pendingChanges { if pendingChange.key.IsEmpty() { if aks.KeyIDFor(pendingChange.alias) == "" { aks.log.Info(fmt.Sprintf("Change for alias %s observed, key has been deleted in API keystore", pendingChange.alias)) aks.pendingChanges.ForgetChangeFor(pendingChange.alias) continue } // We are still expecting this deletion pendingDeletions++ aks.Delete(pendingChange.alias) continue } // Check if the key is available in the underlying Secret if keyIDInSecret := aks.KeyIDFor(pendingChange.alias); keyIDInSecret == pendingChange.key.keyID { aks.log.Info(fmt.Sprintf("Change for alias %s observed, key %s saved in API keystore", pendingChange.alias, keyIDInSecret)) // Forget this change aks.pendingChanges.ForgetChangeFor(pendingChange.alias) continue } // Change is not reflected in the Secret yet. pendingAdds++ aks.update(pendingChange.remoteClusterName, pendingChange.remoteClusterNamespace, pendingChange.alias, pendingChange.key.keyID, pendingChange.key.encodedValue) } if pendingAdds > 0 || pendingDeletions > 0 { aks.log.Info("Pending changes in API keystore", "add", pendingAdds, "deletion", pendingDeletions) } return aks } func (aks *APIKeyStore) Update(remoteClusterName, remoteClusterNamespace, alias, keyID, encodedKeyValue string) *APIKeyStore { // Save the change in memory aks.pendingChanges.AddKey(remoteClusterName, remoteClusterNamespace, alias, keyID, encodedKeyValue) // Load the change in this instance of the store aks.update(remoteClusterName, remoteClusterNamespace, alias, keyID, encodedKeyValue) return aks } func (aks *APIKeyStore) update(remoteClusterName, remoteClusterNamespace, alias, keyID, encodedKeyValue string) { if aks.aliases == nil { aks.aliases = make(map[string]AliasValue) } aks.aliases[alias] = AliasValue{ Namespace: remoteClusterNamespace, Name: remoteClusterName, ID: keyID, } if aks.encodedKeys == nil { aks.encodedKeys = make(map[string]string) } aks.encodedKeys[alias] = encodedKeyValue } func (aks *APIKeyStore) Aliases() []string { if aks == nil { return nil } aliases := make([]string, len(aks.aliases)) i := 0 for alias := range aks.aliases { aliases[i] = alias i++ } return aliases } func (aks *APIKeyStore) Delete(alias string) *APIKeyStore { // Save the change in memory aks.pendingChanges.DeleteAlias(alias) // Load the change in this instance of the store aks.delete(alias) return aks } func (aks *APIKeyStore) delete(alias string) *APIKeyStore { delete(aks.aliases, alias) delete(aks.encodedKeys, alias) return aks } const ( credentialsKeyFormat = "cluster.remote.%s.credentials" ) // Save synchronizes the in memory content of the API keystore into the Secret. func (aks *APIKeyStore) Save(ctx context.Context, c k8s.Client, owner *esv1.Elasticsearch) *reconciler.Results { secretName := types.NamespacedName{ Name: esv1.RemoteAPIKeysSecretName(owner.Name), Namespace: owner.Namespace, } if aks.IsEmpty() { return aks.deleteSecret(ctx, c, secretName) } results := &reconciler.Results{} aliases, err := json.Marshal(aks.aliases) if err != nil { return results.WithError(err) } data := make(map[string][]byte, len(aks.encodedKeys)) for k, v := range aks.encodedKeys { data[fmt.Sprintf(credentialsKeyFormat, k)] = []byte(v) } expectedLabels := labels.AddCredentialsLabel(label.NewLabels(k8s.ExtractNamespacedName(owner))) expectedLabels[commonv1.TypeLabelName] = RemoteClusterAPIKeysType expected := corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: secretName.Name, Namespace: secretName.Namespace, Annotations: map[string]string{ aliasesAnnotationName: string(aliases), }, Labels: expectedLabels, }, Data: data, } if _, err := reconciler.ReconcileSecret(ctx, c, expected, owner); err != nil { if errors.IsConflict(err) { return results.WithResult(reconcile.Result{Requeue: true}) } return results.WithError(err) } return nil } func (aks *APIKeyStore) deleteSecret(ctx context.Context, c k8s.Client, secretName types.NamespacedName) *reconciler.Results { // Delete the Secret used to load the current state. deleteOptions := make([]client.DeleteOption, 0, 2) if aks.uid != "" { deleteOptions = append(deleteOptions, &client.DeleteOptions{Preconditions: &metav1.Preconditions{UID: &aks.uid}}) } if aks.resourceVersion != "" { deleteOptions = append(deleteOptions, &client.DeleteOptions{Preconditions: &metav1.Preconditions{ResourceVersion: &aks.resourceVersion}}) } results := &reconciler.Results{} if err := c.Delete(ctx, &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: secretName.Name, Namespace: secretName.Namespace}}, deleteOptions..., ); err != nil { if errors.IsNotFound(err) { return nil } if errors.IsConflict(err) { return results.WithResult(reconcile.Result{Requeue: true}) } return results.WithError(err) } return nil } func (aks *APIKeyStore) IsEmpty() bool { if aks == nil { return true } return len(aks.aliases) == 0 } // ForCluster returns func (aks *APIKeyStore) ForCluster(namespace string, name string) sets.Set[string] { aliases := sets.New[string]() if aks == nil { return aliases } for alias, c := range aks.aliases { if c.Name == name && c.Namespace == namespace { aliases.Insert(alias) } } return aliases }